http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
deleted file mode 100644
index 89baa98..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-@SuppressWarnings({ "serial", "deprecation" })
-public class ReduceWrappingFunctionTest {
-
-       @SuppressWarnings("unchecked")
-       @Test
-       public void testWrappedReduceObject() {
-               try {
-                       AtomicInteger methodCounter = new AtomicInteger();
-                       
-                       ReduceOperator reduceOp = ReduceOperator.builder(new 
TestReduceFunction(methodCounter)).build();
-                       
-                       RichFunction reducer = (RichFunction) 
reduceOp.getUserCodeWrapper().getUserCodeObject();
-                       
-                       // test the method invocations
-                       reducer.close();
-                       reducer.open(new Configuration());
-                       assertEquals(2, methodCounter.get());
-                       
-                       // prepare the reduce / combine tests
-                       final List<Record> target = new ArrayList<Record>();
-                       Collector<Record> collector = new Collector<Record>() {
-                               @Override
-                               public void collect(Record record) {
-                                       target.add(record);
-                               }
-                               @Override
-                               public void close() {}
-                       };
-                       
-                       List<Record> source = new ArrayList<Record>();
-                       source.add(new Record(new IntValue(42), new 
LongValue(11)));
-                       source.add(new Record(new IntValue(13), new 
LongValue(17)));
-                       
-                       // test reduce
-                       ((GroupReduceFunction<Record, Record>) 
reducer).reduce(source, collector);
-                       assertEquals(2, target.size());
-                       assertEquals(new IntValue(42), 
target.get(0).getField(0, IntValue.class));
-                       assertEquals(new LongValue(11), 
target.get(0).getField(1, LongValue.class));
-                       assertEquals(new IntValue(13), 
target.get(1).getField(0, IntValue.class));
-                       assertEquals(new LongValue(17), 
target.get(1).getField(1, LongValue.class));
-                       target.clear();
-                       
-                       // test combine
-                       ((GroupCombineFunction<Record, Record>) 
reducer).combine(source, collector);
-                       assertEquals(2, target.size());
-                       assertEquals(new IntValue(42), 
target.get(0).getField(0, IntValue.class));
-                       assertEquals(new LongValue(11), 
target.get(0).getField(1, LongValue.class));
-                       assertEquals(new IntValue(13), 
target.get(1).getField(0, IntValue.class));
-                       assertEquals(new LongValue(17), 
target.get(1).getField(1, LongValue.class));
-                       target.clear();
-                       
-                       // test the serialization
-                       SerializationUtils.clone((java.io.Serializable) 
reducer);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Test
-       public void testWrappedReduceClass() {
-               try {
-                       ReduceOperator reduceOp = 
ReduceOperator.builder(TestReduceFunction.class).build();
-                       
-                       UserCodeWrapper<GroupReduceFunction<Record, Record>> 
udf = reduceOp.getUserCodeWrapper();
-                       UserCodeWrapper<GroupReduceFunction<Record, Record>> 
copy = SerializationUtils.clone(udf);
-                       GroupReduceFunction<Record, Record> reducer = 
copy.getUserCodeObject();
-                       
-                       // prepare the reduce / combine tests
-                       final List<Record> target = new ArrayList<Record>();
-                       Collector<Record> collector = new Collector<Record>() {
-                               @Override
-                               public void collect(Record record) {
-                                       target.add(record);
-                               }
-                               @Override
-                               public void close() {}
-                       };
-                       
-                       List<Record> source = new ArrayList<Record>();
-                       source.add(new Record(new IntValue(42), new 
LongValue(11)));
-                       source.add(new Record(new IntValue(13), new 
LongValue(17)));
-                       
-                       // test reduce
-                       reducer.reduce(source, collector);
-                       assertEquals(2, target.size());
-                       assertEquals(new IntValue(42), 
target.get(0).getField(0, IntValue.class));
-                       assertEquals(new LongValue(11), 
target.get(0).getField(1, LongValue.class));
-                       assertEquals(new IntValue(13), 
target.get(1).getField(0, IntValue.class));
-                       assertEquals(new LongValue(17), 
target.get(1).getField(1, LongValue.class));
-                       target.clear();
-                       
-                       // test combine
-                       ((GroupCombineFunction<Record, Record>) 
reducer).combine(source, collector);
-                       assertEquals(2, target.size());
-                       assertEquals(new IntValue(42), 
target.get(0).getField(0, IntValue.class));
-                       assertEquals(new LongValue(11), 
target.get(0).getField(1, LongValue.class));
-                       assertEquals(new IntValue(13), 
target.get(1).getField(0, IntValue.class));
-                       assertEquals(new LongValue(17), 
target.get(1).getField(1, LongValue.class));
-                       target.clear();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testExtractSemantics() {
-               try {
-                       {
-                               ReduceOperator reduceOp = 
ReduceOperator.builder(new TestReduceFunction()).build();
-                               
-                               SingleInputSemanticProperties props = 
reduceOp.getSemanticProperties();
-                               FieldSet fw2 = 
props.getForwardingTargetFields(0, 2);
-                               FieldSet fw4 = 
props.getForwardingTargetFields(0, 4);
-                               assertNotNull(fw2);
-                               assertNotNull(fw4);
-                               assertEquals(1, fw2.size());
-                               assertEquals(1, fw4.size());
-                               assertTrue(fw2.contains(2));
-                               assertTrue(fw4.contains(4));
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCombinable() {
-               try {
-                       {
-                               ReduceOperator reduceOp = 
ReduceOperator.builder(new TestReduceFunction()).build();
-                               assertTrue(reduceOp.isCombinable());
-                       }
-                       {
-                               ReduceOperator reduceOp = 
ReduceOperator.builder(TestReduceFunction.class).build();
-                               assertTrue(reduceOp.isCombinable());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Combinable
-       @ConstantFields({2, 4})
-       public static class TestReduceFunction extends ReduceFunction {
-               
-               private final AtomicInteger methodCounter;
-               
-               private TestReduceFunction(AtomicInteger methodCounter) {
-                       this.methodCounter= methodCounter;
-               }
-               
-               public TestReduceFunction() {
-                       methodCounter = new AtomicInteger();
-               }
-               
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       while (records.hasNext()) {
-                               out.collect(records.next());
-                       }
-               }
-               
-               @Override
-               public void close() throws Exception {
-                       methodCounter.incrementAndGet();
-                       super.close();
-               }
-               
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       methodCounter.incrementAndGet();
-                       super.open(parameters);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
deleted file mode 100644
index 5ccfdd9..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvInputFormatTest {
-       
-       protected File tempFile;
-       
-       private final CsvInputFormat format = new CsvInputFormat();
-       
-       //Static variables for testing the removal of \r\n to \n
-       private static final String FIRST_PART = "That is the first part";
-               
-       private static final String SECOND_PART = "That is the second part";
-       
-       // 
--------------------------------------------------------------------------------------------
-       @Before
-       public void setup() {
-               format.setFilePath("file:///some/file/that/will/not/be/read");
-       }
-       
-       @After
-       public void setdown() throws Exception {
-               if (this.format != null) {
-                       this.format.close();
-               }
-               if (this.tempFile != null) {
-                       this.tempFile.delete();
-               }
-       }
-
-       @Test
-       public void testConfigureEmptyConfig() {
-               try {
-                       Configuration config = new Configuration();
-                       
-                       // empty configuration, plus no fields on the format 
itself is not valid
-                       try {
-                               format.configure(config);
-                               fail(); // should give an error
-                       } catch (IllegalConfigurationException e) {
-                               ; // okay
-                       }
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Test
-       public void readWithEmptyFieldInstanceParameters() {
-               try {
-                       final String fileContent = 
"abc|def|ghijk\nabc||hhg\n|||";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
-               
-                       final Configuration parameters = new Configuration();
-
-                       format.setFieldDelimiter('|');
-                       format.setFieldTypes(StringValue.class, 
StringValue.class, StringValue.class);
-                       
-                       format.configure(parameters);
-                       format.open(split);
-                       
-                       Record record = new Record();
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals("abc", record.getField(0, 
StringValue.class).getValue());
-                       assertEquals("def", record.getField(1, 
StringValue.class).getValue());
-                       assertEquals("ghijk", record.getField(2, 
StringValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals("abc", record.getField(0, 
StringValue.class).getValue());
-                       assertEquals("", record.getField(1, 
StringValue.class).getValue());
-                       assertEquals("hhg", record.getField(2, 
StringValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals("", record.getField(0, 
StringValue.class).getValue());
-                       assertEquals("", record.getField(1, 
StringValue.class).getValue());
-                       assertEquals("", record.getField(2, 
StringValue.class).getValue());
-               }
-               catch (Exception ex) {
-                       ex.printStackTrace();
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void readWithEmptyFieldConfigParameters() {
-               try {
-                       final String fileContent = 
"abc|def|ghijk\nabc||hhg\n|||";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
-               
-                       final Configuration parameters = new Configuration();
-                       new CsvInputFormat.ConfigBuilder(null, parameters)
-                               .field(StringValue.class, 
0).field(StringValue.class, 1).field(StringValue.class, 2);
-                       
-                       format.setFieldDelimiter("|");
-                       
-                       format.configure(parameters);
-                       format.open(split);
-                       
-                       Record record = new Record();
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals("abc", record.getField(0, 
StringValue.class).getValue());
-                       assertEquals("def", record.getField(1, 
StringValue.class).getValue());
-                       assertEquals("ghijk", record.getField(2, 
StringValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals("abc", record.getField(0, 
StringValue.class).getValue());
-                       assertEquals("", record.getField(1, 
StringValue.class).getValue());
-                       assertEquals("hhg", record.getField(2, 
StringValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals("", record.getField(0, 
StringValue.class).getValue());
-                       assertEquals("", record.getField(1, 
StringValue.class).getValue());
-                       assertEquals("", record.getField(2, 
StringValue.class).getValue());
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testReadAll() throws IOException {
-               try {
-                       final String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
-               
-                       final Configuration parameters = new Configuration();
-                       
-                       new CsvInputFormat.ConfigBuilder(null, parameters)
-                               .fieldDelimiter('|')
-                               .field(IntValue.class, 0).field(IntValue.class, 
1).field(IntValue.class, 2)
-                               .field(IntValue.class, 3).field(IntValue.class, 
4);
-                       
-                       format.configure(parameters);
-                       format.open(split);
-                       
-                       Record record = new Record();
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(111, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(222, record.getField(1, 
IntValue.class).getValue());
-                       assertEquals(333, record.getField(2, 
IntValue.class).getValue());
-                       assertEquals(444, record.getField(3, 
IntValue.class).getValue());
-                       assertEquals(555, record.getField(4, 
IntValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(666, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(777, record.getField(1, 
IntValue.class).getValue());
-                       assertEquals(888, record.getField(2, 
IntValue.class).getValue());
-                       assertEquals(999, record.getField(3, 
IntValue.class).getValue());
-                       assertEquals(000, record.getField(4, 
IntValue.class).getValue());
-                       
-                       assertNull(format.nextRecord(record));
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testReadFirstN() throws IOException {
-               try {
-                       final String fileContent = 
"111|222|333|444|555|\n666|777|888|999|000|";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
-               
-                       final Configuration parameters = new Configuration();
-                       
-                       new CsvInputFormat.ConfigBuilder(null, parameters)
-                       .fieldDelimiter('|')
-                       .field(IntValue.class, 0).field(IntValue.class, 1);
-                       
-                       format.configure(parameters);
-                       format.open(split);
-                       
-                       Record record = new Record();
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(111, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(222, record.getField(1, 
IntValue.class).getValue());
-                       boolean notParsed = false;
-                       try {
-                               record.getField(2, IntValue.class);
-                       } catch (IndexOutOfBoundsException ioo) {
-                               notParsed = true;
-                       }
-                       assertTrue(notParsed);
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(666, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(777, record.getField(1, 
IntValue.class).getValue());
-                       notParsed = false;
-                       try {
-                               record.getField(2, IntValue.class);
-                       } catch (IndexOutOfBoundsException ioo) {
-                               notParsed = true;
-                       }
-                       assertTrue(notParsed);
-                       
-                       assertNull(format.nextRecord(record));
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-               
-       }
-       
-       @Test
-       public void testReadSparse() throws IOException {
-               try {
-                       final String fileContent = 
"111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
-               
-                       final Configuration parameters = new Configuration();
-                       
-                       new CsvInputFormat.ConfigBuilder(null, parameters)
-                               .fieldDelimiter('|')
-                               .field(IntValue.class, 0).field(IntValue.class, 
3).field(IntValue.class, 7);
-                       
-                       format.configure(parameters);
-                       format.open(split);
-                       
-                       Record record = new Record();
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(111, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(444, record.getField(1, 
IntValue.class).getValue());
-                       assertEquals(888, record.getField(2, 
IntValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(000, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(777, record.getField(1, 
IntValue.class).getValue());
-                       assertEquals(333, record.getField(2, 
IntValue.class).getValue());
-                       
-                       assertNull(format.nextRecord(record));
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testReadSparseShufflePosition() throws IOException {
-               try {
-                       final String fileContent = 
"111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
-                       final FileInputSplit split = 
createTempFile(fileContent);       
-               
-                       final Configuration parameters = new Configuration();
-                       
-                       new CsvInputFormat.ConfigBuilder(null, parameters)
-                               .fieldDelimiter('|')
-                               .field(IntValue.class, 8).field(IntValue.class, 
1).field(IntValue.class, 3);
-                       
-                       format.configure(parameters);
-                       format.open(split);
-                       
-                       Record record = new Record();
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(999, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(222, record.getField(1, 
IntValue.class).getValue());
-                       assertEquals(444, record.getField(2, 
IntValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(222, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(999, record.getField(1, 
IntValue.class).getValue());
-                       assertEquals(777, record.getField(2, 
IntValue.class).getValue());
-                       
-                       assertNull(format.nextRecord(record));
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       private FileInputSplit createTempFile(String content) throws 
IOException {
-               this.tempFile = File.createTempFile("test_contents", "tmp");
-               this.tempFile.deleteOnExit();
-               
-               DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(tempFile));
-               dos.writeBytes(content);
-               dos.close();
-                       
-               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
-       }
-       
-       @Test
-       public void testWindowsLineEndRemoval() {
-               
-               //Check typical use case -- linux file is correct and it is set 
up to linuc(\n)
-               this.testRemovingTrailingCR("\n", "\n");
-               
-               //Check typical windows case -- windows file endings and file 
has windows file endings set up
-               this.testRemovingTrailingCR("\r\n", "\r\n");
-               
-               //Check problematic case windows file -- windows file 
endings(\r\n) but linux line endings (\n) set up
-               this.testRemovingTrailingCR("\r\n", "\n");
-               
-               //Check problematic case linux file -- linux file endings (\n) 
but windows file endings set up (\r\n)
-               //Specific setup for windows line endings will expect \r\n 
because it has to be set up and is not standard.
-       }
-       
-       private void testRemovingTrailingCR(String lineBreakerInFile, String 
lineBreakerSetup) {
-               File tempFile=null;
-               
-               String fileContent = CsvInputFormatTest.FIRST_PART + 
lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile;
-               
-               try {
-                       // create input file
-                       tempFile = File.createTempFile("CsvInputFormatTest", 
"tmp");
-                       tempFile.deleteOnExit();
-                       tempFile.setWritable(true);
-                       
-                       OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
-                       wrt.write(fileContent);
-                       wrt.close();
-                       
-                       //Instantiate input format
-                       CsvInputFormat inputFormat = new CsvInputFormat();
-                       
-                       Configuration parameters = new Configuration();
-                       new CsvInputFormat.ConfigBuilder(null, parameters)
-                       .field(StringValue.class, 
0).filePath(tempFile.toURI().toString());
-                       
-                       
-                       inputFormat.configure(parameters);
-                       
-                       inputFormat.setDelimiter(lineBreakerSetup);
-                       
-                       FileInputSplit[] splits = 
inputFormat.createInputSplits(1);
-                                               
-                       inputFormat.open(splits[0]);
-                       
-                       Record record = new Record();
-                       
-                       Record result = inputFormat.nextRecord(record);
-                       
-                       assertNotNull("Expecting to not return null", result);
-                       
-                       
-                       
-                       assertEquals(FIRST_PART, result.getField(0, 
StringValue.class).getValue());
-                       
-                       result = inputFormat.nextRecord(record);
-                       
-                       assertNotNull("Expecting to not return null", result);
-                       assertEquals(SECOND_PART, result.getField(0, 
StringValue.class).getValue());
-                       
-               }
-               catch (Throwable t) {
-                       System.err.println("test failed with exception: " + 
t.getMessage());
-                       t.printStackTrace(System.err);
-                       fail("Test erroneous");
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
deleted file mode 100644
index 9eb794f..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvOutputFormatTest.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class CsvOutputFormatTest {
-
-       protected Configuration config;
-       
-       protected File tempFile;
-       
-       private final CsvOutputFormat format = new CsvOutputFormat();
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Before
-       public void setup() throws IOException {
-               this.tempFile = File.createTempFile("test_output", "tmp");
-               this.format.setOutputFilePath(new Path(tempFile.toURI()));
-               this.format.setWriteMode(WriteMode.OVERWRITE);
-       }
-       
-       @After
-       public void setdown() throws Exception {
-               if (this.format != null) {
-                       this.format.close();
-               }
-               if (this.tempFile != null) {
-                       this.tempFile.delete();
-               }
-       }
-       
-       @Test
-       public void testConfigure() 
-       {
-               try {
-                       Configuration config = new Configuration();
-                       
-                       // check missing number of fields
-                       boolean validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                       } catch(IllegalStateException ise) {
-                               validConfig = false;
-                       }
-                       assertFalse(validConfig);
-                       
-                       // check missing file parser
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
2);
-                       validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                       } catch(IllegalStateException ise) {
-                               validConfig = false;
-                       }
-                       assertFalse(validConfig);
-                       
-                       // check valid config
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, 
StringValue.class);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, 
IntValue.class);
-                       validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                       }
-                       assertTrue(validConfig);
-                       
-                       // check invalid file parser config
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
3);
-                       validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                       }
-                       assertFalse(validConfig);
-                       
-                       // check valid config
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 2, 
StringValue.class);
-                       validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                       }
-                       assertTrue(validConfig);
-                       
-                       // check valid config
-                       
config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-                       validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                               System.out.println(iae.getMessage());
-                       }
-                       assertTrue(validConfig);
-                       
-                       // check invalid text pos config
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-                       validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                       }
-                       assertFalse(validConfig);
-                       
-                       // check valid text pos config
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 3);
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 2, 9);
-                       validConfig = true;
-                       try {
-                               format.configure(config);
-                       } catch(IllegalArgumentException iae) {
-                               validConfig = false;
-                       }
-                       assertTrue(validConfig);
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWriteNoRecPosNoLenient()
-       {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, 
StringValue.class);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, 
IntValue.class);
-                       
-                       format.configure(config);
-                       
-                       try {
-                               format.open(0, 1);
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-                       
-                       Record r = new Record(2);
-                       
-                       try {
-                               r.setField(0, new StringValue("Hello World"));
-                               r.setField(1, new IntValue(42));
-                               format.writeRecord(r);
-                               
-                               r.setField(0, new StringValue("AbCdE"));
-                               r.setField(1, new IntValue(13));
-                               format.writeRecord(r);
-                               
-                               format.close();
-                               
-                               BufferedReader dis = new BufferedReader(new 
FileReader(tempFile));
-                               
-                               assertTrue((dis.readLine()+"\n").equals("Hello 
World|42\n"));
-                               
assertTrue((dis.readLine()+"\n").equals("AbCdE|13\n"));
-                               
-                               dis.close();
-                               
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWriteNoRecPosNoLenientFail()
-       {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, 
StringValue.class);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, 
IntValue.class);
-                       
-                       format.configure(config);
-                       
-                       try {
-                               format.open(0, 1);
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-                       
-                       Record r = new Record(2);
-                       
-                       boolean success = true;
-                       
-                       try {
-                               r.setField(0, new StringValue("Hello World"));
-                               r.setField(1, new IntValue(42));
-                               format.writeRecord(r);
-                               
-                               r.setNull(0);
-                               r.setField(1, new IntValue(13));
-                               format.writeRecord(r);
-                               
-                               format.close();
-                                                       
-                       } catch (IOException e) {
-                               success = false;
-                       } catch (RuntimeException re) {
-                               success = false;
-                       }
-                       
-                       assertFalse(success);
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWriteNoRecPosLenient()
-       {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, 
StringValue.class);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, 
IntValue.class);
-                       config.setBoolean(CsvOutputFormat.LENIENT_PARSING, 
true);
-                       
-                       format.configure(config);
-                       
-                       try {
-                               format.open(0, 1);
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-                       
-                       Record r = new Record(2);
-                       
-                       try {
-                               r.setField(0, new StringValue("Hello World"));
-                               r.setField(1, new IntValue(42));
-                               format.writeRecord(r);
-                               
-                               r.setNull(0);
-                               r.setField(1, new IntValue(13));
-                               format.writeRecord(r);
-                               
-                               format.close();
-                               
-                               BufferedReader dis = new BufferedReader(new 
FileReader(tempFile));
-                               
-                               assertTrue((dis.readLine()+"\n").equals("Hello 
World|42\n"));
-                               
assertTrue((dis.readLine()+"\n").equals("|13\n"));
-                               
-                               dis.close();
-                               
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWriteRecPosNoLenient()
-       {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, 
StringValue.class);
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, 
StringValue.class);
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-                       
-                       format.configure(config);
-                       
-                       try {
-                               format.open(0, 1);
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-                       
-                       Record r = new Record(2);
-                       
-                       try {
-                               r.setField(0, new StringValue("Hello World"));
-                               r.setField(1, new IntValue(42));
-                               r.setField(2, new StringValue("Hello User"));
-                               format.writeRecord(r);
-                               
-                               r.setField(0, new StringValue("AbCdE"));
-                               r.setField(1, new IntValue(13));
-                               r.setField(2, new StringValue("ZyXvW"));
-                               format.writeRecord(r);
-                               
-                               format.close();
-                               
-                               BufferedReader dis = new BufferedReader(new 
FileReader(tempFile));
-                               
-                               assertTrue((dis.readLine()+"\n").equals("Hello 
User|Hello World\n"));
-                               
assertTrue((dis.readLine()+"\n").equals("ZyXvW|AbCdE\n"));
-                               
-                               dis.close();
-                               
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWriteRecPosNoLenientFail()
-       {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, 
StringValue.class);
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, 
StringValue.class);
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-                       
-                       format.configure(config);
-                       
-                       try {
-                               format.open(0, 1);
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-                       
-                       Record r = new Record(2);
-                       
-                       boolean success = true;
-                       
-                       try {
-                               r.setField(0, new StringValue("Hello World"));
-                               r.setField(1, new IntValue(42));
-                               r.setField(2, new StringValue("Hello User"));
-                               format.writeRecord(r);
-       
-                               r = new Record();
-                               
-                               r.setField(0, new StringValue("AbCdE"));
-                               r.setField(1, new IntValue(13));
-                               format.writeRecord(r);
-                               
-                               format.close();
-                               
-                       } catch (IOException e) {
-                               success = false;
-                       } catch (RuntimeException re) {
-                               success = false;
-                       }
-                       
-                       assertFalse(success);
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-       
-       @Test
-       public void testWriteRecPosLenient()
-       {
-               try {
-                       Configuration config = new Configuration();
-                       
config.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, "|");
-                       config.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 
2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, 
StringValue.class);
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 2);
-                       
config.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, 
StringValue.class);
-                       
config.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 0);
-                       config.setBoolean(CsvOutputFormat.LENIENT_PARSING, 
true);
-                       
-                       format.configure(config);
-                       
-                       try {
-                               format.open(0, 1);
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-                       
-                       Record r = new Record(2);
-                       
-                       try {
-                               r.setField(0, new StringValue("Hello World"));
-                               r.setField(1, new IntValue(42));
-                               r.setField(2, new StringValue("Hello User"));
-                               format.writeRecord(r);
-       
-                               r = new Record();
-                               
-                               r.setField(0, new StringValue("AbCdE"));
-                               r.setField(1, new IntValue(13));
-                               format.writeRecord(r);
-                               
-                               format.close();
-                               
-                               BufferedReader dis = new BufferedReader(new 
FileReader(tempFile));
-                               
-                               assertTrue((dis.readLine()+"\n").equals("Hello 
User|Hello World\n"));
-                               
assertTrue((dis.readLine()+"\n").equals("|AbCdE\n"));
-                               
-                               dis.close();
-                               
-                       } catch (IOException e) {
-                               fail(e.getMessage());
-                       }
-               }
-               catch (Exception ex) {
-                       Assert.fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
-               }
-       }
-               
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
deleted file mode 100644
index 4aec38e..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormatTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import 
org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExternalProcessFixedLengthInputFormatTest {
-
-private ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> 
format;
-       
-       private final String neverEndingCommand = "cat /dev/urandom";
-       private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 
count=1000";
-       private final String incompleteRecordsCommand = "dd if=/dev/zero bs=7 
count=2";
-       private final String failingCommand = "ls /I/do/not/exist";
-       
-       @Before
-       public void prepare() {
-               format = new MyExternalProcessTestInputFormat();
-       }
-       
-       @Test
-       public void testOpen() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               
config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY,
 8);
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-                               
-               boolean processDestroyed = false;
-               try {
-                       format.configure(config);
-                       format.open(split);
-                       
-                       String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | 
grep \"cat /dev/urandom\" | wc -l"};
-                       
-                       byte[] wcOut = new byte[128];
-                       Process p = Runtime.getRuntime().exec(cmd);
-                       p.getInputStream().read(wcOut);
-                       int pCnt = Integer.parseInt(new String(wcOut).trim());
-                       Assert.assertTrue(pCnt > 0);
-
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().equals("External process was 
destroyed although stream was not fully read.")) {
-                               processDestroyed = true;
-                       }
-               } finally {
-                       Assert.assertTrue(processDestroyed);
-               }
-       }
-       
-       @Test
-       public void testCheckExitCode() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               
config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY,
 8);
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, failingCommand);
-               
-               format.configure(config);
-               boolean invalidExitCode = false;
-               try {
-                       format.open(split);
-                       format.waitForProcessToFinish();
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (InterruptedException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().startsWith("External process did not 
finish with an allowed exit code:")) {
-                               invalidExitCode = true; 
-                       }
-               }
-               Assert.assertTrue(invalidExitCode);
-               
-               invalidExitCode = false;
-               
config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2");
-               format.configure(config);
-               try {
-                       format.open(split);
-                       // wait for process to start...
-                       Thread.sleep(100);
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (InterruptedException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().startsWith("External process did not 
finish with an allowed exit code:")) {
-                               invalidExitCode = true; 
-                       }
-               }
-               Assert.assertTrue(!invalidExitCode);
-               
-       }
-       
-       @Test
-       public void testUserCodeTermination() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               
config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY,
 8);
-               
config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 
100);
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-               Record record = new Record();
-                               
-               boolean userException = false;
-               boolean processDestroyed = false;
-               try {
-                       format.configure(config);
-                       format.open(split);
-                       while(!format.reachedEnd()) {
-                               try {
-                                       format.nextRecord(record);
-                               } catch(RuntimeException re) {
-                                       userException = true;
-                                       break;
-                               }
-                       }
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().equals("External process was 
destroyed although stream was not fully read.")) {
-                               processDestroyed = true;
-                       }
-               } finally {
-                       Assert.assertTrue(userException && processDestroyed);
-               }
-       }
-       
-       @Test
-       public void testReadStream() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               
config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY,
 8);
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand);
-               Record record = new Record();
-
-               int cnt = 0;
-               try {
-                       format.configure(config);
-                       format.open(split);
-                       while(!format.reachedEnd()) {
-                               if (format.nextRecord(record) != null) {
-                                       cnt++;
-                               }
-                       }
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       Assert.fail(e.getMessage());
-               }
-               Assert.assertTrue(cnt == 1000);
-       }
-       
-       @Test
-       public void testReadInvalidStream() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               
config.setInteger(ExternalProcessFixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY,
 8);
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, this.incompleteRecordsCommand);
-               Record record = new Record();
-
-               boolean incompleteRecordDetected = false;
-               @SuppressWarnings("unused")
-               int cnt = 0;
-               try {
-                       format.configure(config);
-                       format.open(split);
-                       while(!format.reachedEnd()) {
-                               if (format.nextRecord(record) != null) {
-                                       cnt++;
-                               }
-                       }
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().equals("External process produced 
incomplete record")) {
-                               incompleteRecordDetected = true;
-                       } else {
-                               Assert.fail(e.getMessage());
-                       }
-               }
-               Assert.assertTrue(incompleteRecordDetected);
-       }
-       
-       private final class MyExternalProcessTestInputFormat extends 
ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> {
-               private static final long serialVersionUID = 1L;
-
-               public static final String FAILCOUNT_PARAMETER_KEY = 
"test.failingCount";
-               
-               private long cnt = 0;
-               private int failCnt;
-               
-               @Override
-               public void configure(Configuration parameters) {
-                       super.configure(parameters);
-                       failCnt = 
parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE);
-               }
-               
-               @Override
-               public boolean readBytes(Record record, byte[] bytes, int 
startPos) {
-
-                       if(cnt == failCnt) {
-                               throw new RuntimeException("This is a test 
exception!");
-                       }
-                       
-                       int v1 = 0;
-                       v1 = v1        | (0xFF & bytes[startPos+0]);
-                       v1 = (v1 << 8) | (0xFF & bytes[startPos+1]);
-                       v1 = (v1 << 8) | (0xFF & bytes[startPos+2]);
-                       v1 = (v1 << 8) | (0xFF & bytes[startPos+3]);
-                       
-                       int v2 = 0;
-                       v2 = v2        | (0xFF & bytes[startPos+4]);
-                       v2 = (v2 << 8) | (0xFF & bytes[startPos+5]);
-                       v2 = (v2 << 8) | (0xFF & bytes[startPos+6]);
-                       v2 = (v2 << 8) | (0xFF & bytes[startPos+7]);
-                       
-                       record.setField(0,new IntValue(v1));
-                       record.setField(1,new IntValue(v2));
-                       
-                       cnt++;
-                       
-                       return true;
-               }
-
-               @Override
-               public ExternalProcessInputSplit[] createInputSplits(int 
minNumSplits)
-                               throws IOException {
-                       return null;
-               }
-
-               @Override
-               public DefaultInputSplitAssigner 
getInputSplitAssigner(GenericInputSplit[] splits) {
-                       return new DefaultInputSplitAssigner(splits);
-               }
-
-               @Override
-               public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
deleted file mode 100644
index 6b8cacb..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormatTest.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.record.io.ExternalProcessInputFormat;
-import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExternalProcessInputFormatTest {
-
-       private ExternalProcessInputFormat<ExternalProcessInputSplit> format;
-       
-       private final String neverEndingCommand = "cat /dev/urandom";
-       private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 
count=1000";
-       private final String failingCommand = "ls /I/do/not/exist";
-       
-       @Before
-       public void prepare() {
-               format = new MyExternalProcessTestInputFormat();
-       }
-       
-       @Test
-       public void testOpen() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-               
-               boolean processDestroyed = false;
-               try {
-                       format.configure(config);
-                       format.open(split);
-                       
-                       String[] cmd = {"/bin/sh","-c","ps aux | grep -v grep | 
grep \"cat /dev/urandom\" | wc -l"};
-                       
-                       byte[] wcOut = new byte[128];
-                       Process p = Runtime.getRuntime().exec(cmd);
-                       p.getInputStream().read(wcOut);
-                       int pCnt = Integer.parseInt(new String(wcOut).trim());
-                       Assert.assertTrue(pCnt > 0);
-
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().equals("External process was 
destroyed although stream was not fully read.")) {
-                               processDestroyed = true;
-                       }
-               } finally {
-                       Assert.assertTrue(processDestroyed);
-               }
-       }
-       
-       @Test
-       public void testCheckExitCode() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, failingCommand);
-               
-               format.configure(config);
-               boolean invalidExitCode = false;
-               try {
-                       format.open(split);
-                       format.waitForProcessToFinish();
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (InterruptedException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().startsWith("External process did not 
finish with an allowed exit code:")) {
-                               invalidExitCode = true; 
-                       }
-               }
-               Assert.assertTrue(invalidExitCode);
-               
-               invalidExitCode = false;
-               
config.setString(ExternalProcessInputFormat.ALLOWEDEXITCODES_PARAMETER_KEY,"0,1,2");
-               format.configure(config);
-               try {
-                       format.open(split);
-                       format.waitForProcessToFinish();
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (InterruptedException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().startsWith("External process did not 
finish with an allowed exit code:")) {
-                               invalidExitCode = true; 
-                       }
-               }
-               Assert.assertTrue(!invalidExitCode);
-               
-       }
-       
-       @Test
-       public void testUserCodeTermination() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               
config.setInteger(MyExternalProcessTestInputFormat.FAILCOUNT_PARAMETER_KEY, 
100);
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
-               Record record = new Record();
-                               
-               boolean userException = false;
-               boolean processDestroyed = false;
-               try {
-                       format.configure(config);
-                       format.open(split);
-                       while(!format.reachedEnd()) {
-                               try {
-                                       format.nextRecord(record);
-                               } catch(RuntimeException re) {
-                                       userException = true;
-                                       break;
-                               }
-                       }
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       if(e.getMessage().equals("External process was 
destroyed although stream was not fully read.")) {
-                               processDestroyed = true;
-                       }
-               } finally {
-                       Assert.assertTrue(userException && processDestroyed);
-               }
-       }
-       
-       @Test
-       public void testReadStream() {
-               
-               if(OperatingSystem.isWindows()) {
-                       return;
-               }
-               
-               Configuration config = new Configuration();
-               ExternalProcessInputSplit split = new 
ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand);
-               Record record = new Record();
-
-               int cnt = 0;
-               try {
-                       format.configure(config);
-                       format.open(split);
-                       while(!format.reachedEnd()) {
-                               if (format.nextRecord(record) != null) {
-                                       cnt++;
-                               }
-                       }
-                       format.close();
-               } catch (IOException e) {
-                       Assert.fail();
-               } catch (RuntimeException e) {
-                       Assert.fail(e.getMessage());
-               }
-               Assert.assertTrue("Expected read count was 1000, actual read 
count was "+cnt, cnt == 1000);
-       }
-       
-       private final class MyExternalProcessTestInputFormat extends 
ExternalProcessInputFormat<ExternalProcessInputSplit> {
-               private static final long serialVersionUID = 1L;
-
-               public static final String FAILCOUNT_PARAMETER_KEY = 
"test.failingCount";
-               
-               private byte[] buf = new byte[8];
-               
-               private long cnt = 0;
-               private int failCnt;
-               private boolean end;
-               
-               @Override
-               public void configure(Configuration parameters) {
-                       super.configure(parameters);
-                       failCnt = 
parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE);
-               }
-               
-               @Override
-               public void open(GenericInputSplit split) throws IOException {
-                       super.open(split);
-                       
-                       this.end = false;
-               }
-               
-               @Override
-               public ExternalProcessInputSplit[] createInputSplits(int 
minNumSplits) {
-                       return null;
-               }
-
-               @Override
-               public DefaultInputSplitAssigner 
getInputSplitAssigner(GenericInputSplit[] splits) {
-                       return new DefaultInputSplitAssigner(splits);
-               }
-               
-               @Override
-               public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) {
-                       return null;
-               }
-
-               @Override
-               public Record nextRecord(Record reuse) throws IOException {
-                       
-                       if(cnt > failCnt) {
-                               throw new RuntimeException("This is a test 
exception!");
-                       }
-                       
-                       int totalReadCnt = 0;
-                       
-                       do {
-                               int readCnt = super.extProcOutStream.read(buf, 
totalReadCnt, buf.length-totalReadCnt);
-                               
-                               if(readCnt == -1) {
-                                       this.end = true;
-                                       return null;
-                               } else {
-                                       totalReadCnt += readCnt;
-                               }
-                               
-                       } while(totalReadCnt != 8);
-                               
-                       int v1 = 0;
-                       v1 = v1        | (0xFF & buf[0]);
-                       v1 = (v1 << 8) | (0xFF & buf[1]);
-                       v1 = (v1 << 8) | (0xFF & buf[2]);
-                       v1 = (v1 << 8) | (0xFF & buf[3]);
-                       
-                       int v2 = 0;
-                       v2 = v2        | (0xFF & buf[4]);
-                       v2 = (v2 << 8) | (0xFF & buf[5]);
-                       v2 = (v2 << 8) | (0xFF & buf[6]);
-                       v2 = (v2 << 8) | (0xFF & buf[7]);
-                       
-                       reuse.setField(0,new IntValue(v1));
-                       reuse.setField(1,new IntValue(v2));
-                       
-                       this.cnt++;
-                       
-                       return reuse;
-               }
-
-               @Override
-               public boolean reachedEnd() throws IOException {
-                       return this.end;
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
deleted file mode 100644
index 3d441e6..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FixedLenghtInputFormatTest {
-
-       protected Configuration config;
-       
-       protected File tempFile;
-       
-       private final FixedLengthInputFormat format = new 
MyFixedLengthInputFormat();
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       @Before
-       public void setup() {
-               format.setFilePath("file:///some/file/that/will/not/be/read");
-       }
-       
-       @After
-       public void setdown() throws Exception {
-               if (this.format != null) {
-                       this.format.close();
-               }
-               if (this.tempFile != null) {
-                       this.tempFile.delete();
-               }
-       }
-
-       @Test
-       public void testOpen() throws IOException {
-               final int[] fileContent = {1,2,3,4,5,6,7,8};
-               final FileInputSplit split = createTempFile(fileContent);       
-       
-               final Configuration parameters = new Configuration();
-               
parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-               
-               format.configure(parameters);
-               format.open(split);
-               assertEquals(0, format.getSplitStart());
-               assertEquals(0, format.getReadBufferSize() % 8);
-               format.close();
-
-               
parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 13);
-               format.configure(parameters);
-               format.close();
-               format.open(split);
-               assertEquals(0, format.getReadBufferSize() % 13);
-               format.close();
-               
-               
parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 27);
-               format.configure(parameters);
-               format.close();
-               format.open(split);
-               assertEquals(0, format.getReadBufferSize() % 27);
-               format.close();
-               
-       }
-       
-       @Test
-       public void testRead() throws IOException {
-               final int[] fileContent = {1,2,3,4,5,6,7,8};
-               final FileInputSplit split = createTempFile(fileContent);
-               
-               final Configuration parameters = new Configuration();
-               
-               
parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-               
-               format.configure(parameters);
-               format.open(split);
-               
-               Record record = new Record();
-               
-               assertNotNull(format.nextRecord(record));
-               assertEquals(1, record.getField(0, IntValue.class).getValue());
-               assertEquals(2, record.getField(1, IntValue.class).getValue()); 
-               
-               assertNotNull(format.nextRecord(record));
-               assertEquals(3, record.getField(0, IntValue.class).getValue());
-               assertEquals(4, record.getField(1, IntValue.class).getValue());
-               
-               assertNotNull(format.nextRecord(record));
-               assertEquals(5, record.getField(0, IntValue.class).getValue());
-               assertEquals(6, record.getField(1, IntValue.class).getValue());
-               
-               assertNotNull(format.nextRecord(record));
-               assertEquals(7, record.getField(0, IntValue.class).getValue());
-               assertEquals(8, record.getField(1, IntValue.class).getValue());
-               
-               assertNull(format.nextRecord(record));
-               assertTrue(format.reachedEnd());
-       }
-       
-       
-       @Test
-       public void testReadFail() throws IOException {
-               final int[] fileContent = {1,2,3,4,5,6,7,8,9};
-               final FileInputSplit split = createTempFile(fileContent);
-               
-               final Configuration parameters = new Configuration();
-               
parameters.setInteger(FixedLengthInputFormat.RECORDLENGTH_PARAMETER_KEY, 8);
-               
-               format.configure(parameters);
-               format.open(split);
-               
-               Record record = new Record();
-
-               try {
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(1, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(2, record.getField(1, 
IntValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(3, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(4, record.getField(1, 
IntValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(5, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(6, record.getField(1, 
IntValue.class).getValue());
-                       
-                       assertNotNull(format.nextRecord(record));
-                       assertEquals(7, record.getField(0, 
IntValue.class).getValue());
-                       assertEquals(8, record.getField(1, 
IntValue.class).getValue());
-                       
-                       assertNull(format.nextRecord(record));
-               } catch(IOException ioe) {
-                       assertTrue(ioe.getMessage().equals("Unable to read full 
record"));
-               }
-       }
-       
-       
-       private FileInputSplit createTempFile(int[] contents) throws 
IOException {
-               this.tempFile = File.createTempFile("test_contents", "tmp");
-               this.tempFile.deleteOnExit();
-               
-               DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(tempFile));
-               
-               for(int i : contents) {
-                       dos.writeInt(i);
-               }
-               
-               dos.close();
-                       
-               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
-       }
-               
-       
-       private final class MyFixedLengthInputFormat extends 
FixedLengthInputFormat {
-               private static final long serialVersionUID = 1L;
-
-               IntValue p1 = new IntValue();
-               IntValue p2 = new IntValue();
-               
-               @Override
-               public boolean readBytes(Record target, byte[] buffer, int 
startPos) {
-                       int v1 = 0;
-                       v1 = (v1 | buffer[startPos+0]) << 8;
-                       v1 = (v1 | buffer[startPos+1]) << 8;
-                       v1 = (v1 | buffer[startPos+2]) << 8;
-                       v1 = (v1 | buffer[startPos+3]);
-                       p1.setValue(v1);
-                       
-                       int v2 = 0;
-                       v2 = (v2 | buffer[startPos+4]) << 8;
-                       v2 = (v2 | buffer[startPos+5]) << 8;
-                       v2 = (v2 | buffer[startPos+6]) << 8;
-                       v2 = (v2 | buffer[startPos+7]);
-                       p2.setValue(v2);
-                       
-                       target.setField(0, p1);
-                       target.setField(1, p2);
-                       
-                       return true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
deleted file mode 100644
index 8ca19cf..0000000
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.OutputStreamWriter;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.junit.Test;
-
-public class TextInputFormatTest {
-       /**
-        * The TextInputFormat seems to fail reading more than one record. I 
guess its
-        * an off by one error.
-        * 
-        * The easiest workaround is to 
setParameter(TextInputFormat.CHARSET_NAME, "ASCII");
-        */
-       @Test
-       public void testPositionBug() {
-               final String FIRST = "First line";
-               final String SECOND = "Second line";
-               
-               try {
-                       // create input file
-                       File tempFile = 
File.createTempFile("TextInputFormatTest", "tmp");
-                       tempFile.deleteOnExit();
-                       tempFile.setWritable(true);
-                       
-                       FileWriter writer = new FileWriter(tempFile);
-                       writer.append(FIRST).append('\n');
-                       writer.append(SECOND).append('\n');
-                       writer.close();
-                       
-                       TextInputFormat inputFormat = new TextInputFormat();
-                       inputFormat.setFilePath(tempFile.toURI().toString());
-                       
-                       Configuration parameters = new Configuration(); 
-                       inputFormat.configure(parameters);
-                       
-                       FileInputSplit[] splits = 
inputFormat.createInputSplits(1);
-                       assertTrue("expected at least one input split", 
splits.length >= 1);
-                       
-                       inputFormat.open(splits[0]);
-                       
-                       Record r = new Record();
-                       assertNotNull("Expecting first record here", 
inputFormat.nextRecord(r));
-                       assertEquals(FIRST, r.getField(0, 
StringValue.class).getValue());
-                       
-                       assertNotNull("Expecting second record 
here",inputFormat.nextRecord(r ));
-                       assertEquals(SECOND, r.getField(0, 
StringValue.class).getValue());
-                       
-                       assertNull("The input file is over", 
inputFormat.nextRecord(r));
-               }
-               catch (Throwable t) {
-                       System.err.println("test failed with exception: " + 
t.getMessage());
-                       t.printStackTrace(System.err);
-                       fail("Test erroneous");
-               }
-       }
-
-       
-       /**
-        * This tests cases when line ends with \r\n and \n is used as 
delimiter, the last \r should be removed 
-        */
-       @Test
-       public void testRemovingTrailingCR() {
-               
-               testRemovingTrailingCR("\n","\n");
-               testRemovingTrailingCR("\r\n","\n");
-               
-               testRemovingTrailingCR("|","|");
-               testRemovingTrailingCR("|","\n");
-
-       }
-
-       private void testRemovingTrailingCR(String lineBreaker,String 
delimiter) {
-               File tempFile;
-               
-               String FIRST = "First line";
-               String SECOND = "Second line";
-               String CONTENT = FIRST + lineBreaker + SECOND + lineBreaker;
-               
-               try {
-                       // create input file
-                       tempFile = File.createTempFile("TextInputFormatTest", 
"tmp");
-                       tempFile.deleteOnExit();
-                       tempFile.setWritable(true);
-                       
-                       OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
-                       wrt.write(CONTENT);
-                       wrt.close();
-                       
-                       TextInputFormat inputFormat = new TextInputFormat();
-                       inputFormat.setFilePath(tempFile.toURI().toString());
-                       
-                       Configuration parameters = new Configuration(); 
-                       inputFormat.configure(parameters);
-                       
-                       inputFormat.setDelimiter(delimiter);
-                       
-                       FileInputSplit[] splits = 
inputFormat.createInputSplits(1);
-                                               
-                       inputFormat.open(splits[0]);
-                       
-                       Record r = new Record();
-                       if ( (delimiter.equals("\n") && 
(lineBreaker.equals("\n") || lineBreaker.equals("\r\n") ) )
-                                       || (lineBreaker.equals(delimiter)) ){
-
-                               assertNotNull("Expecting first record here", 
inputFormat.nextRecord(r));
-                               assertEquals(FIRST, r.getField(0, 
StringValue.class).getValue());
-                               
-                               assertNotNull("Expecting second record 
here",inputFormat.nextRecord(r ));
-                               assertEquals(SECOND, r.getField(0, 
StringValue.class).getValue());
-                               
-                               assertNull("The input file is over", 
inputFormat.nextRecord(r));
-                       } else {
-                               assertNotNull("Expecting first record here", 
inputFormat.nextRecord(r));
-                               assertEquals(CONTENT, r.getField(0, 
StringValue.class).getValue());
-                       }
-                       
-                       
-               }
-               catch (Throwable t) {
-                       System.err.println("test failed with exception: " + 
t.getMessage());
-                       t.printStackTrace(System.err);
-                       fail("Test erroneous");
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 5ff9eaf..78d61d1 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -169,8 +169,7 @@ public abstract class CostEstimator {
                switch (n.getDriverStrategy()) {
                case NONE:
                case UNARY_NO_OP:
-               case BINARY_NO_OP:      
-               case COLLECTOR_MAP:
+               case BINARY_NO_OP:
                case MAP:
                case MAP_PARTITION:
                case FLAT_MAP:

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
deleted file mode 100644
index 9c1bcd3..0000000
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.CollectorMapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>Map</i> operator node.
- */
-public class CollectorMapNode extends SingleInputNode {
-       
-       private final List<OperatorDescriptorSingle> possibleProperties;
-
-       
-       public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) {
-               super(operator);
-               
-               this.possibleProperties = 
Collections.<OperatorDescriptorSingle>singletonList(new 
CollectorMapDescriptor());
-       }
-
-       @Override
-       public String getOperatorName() {
-               return "Map";
-       }
-
-       @Override
-       protected List<OperatorDescriptorSingle> getPossibleProperties() {
-               return this.possibleProperties;
-       }
-
-       /**
-        * Computes the estimates for the Map operator. Map takes one value and 
transforms it into another value.
-        * The cardinality consequently stays the same.
-        */
-       @Override
-       protected void computeOperatorSpecificDefaultEstimates(DataStatistics 
statistics) {
-               this.estimatedNumRecords = 
getPredecessorNode().getEstimatedNumRecords();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
deleted file mode 100644
index bcd4d73..0000000
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class CollectorMapDescriptor extends OperatorDescriptorSingle {
-
-       @Override
-       public DriverStrategy getStrategy() {
-               return DriverStrategy.COLLECTOR_MAP;
-       }
-
-       @Override
-       public SingleInputPlanNode instantiate(Channel in, SingleInputNode 
node) {
-               return new SingleInputPlanNode(node, "Map 
("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP);
-       }
-
-       @Override
-       protected List<RequestedGlobalProperties> 
createPossibleGlobalProperties() {
-               RequestedGlobalProperties rgp = new RequestedGlobalProperties();
-               rgp.setAnyDistribution();
-               return Collections.singletonList(rgp);
-       }
-
-       @Override
-       protected List<RequestedLocalProperties> 
createPossibleLocalProperties() {
-               return Collections.singletonList(new 
RequestedLocalProperties());
-       }
-       
-       @Override
-       public GlobalProperties computeGlobalProperties(GlobalProperties 
gProps) {
-               if (gProps.getUniqueFieldCombination() != null && 
gProps.getUniqueFieldCombination().size() > 0 &&
-                               gProps.getPartitioning() == 
PartitioningProperty.RANDOM_PARTITIONED)
-               {
-                       
gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
-               }
-               gProps.clearUniqueFieldCombinations();
-               return gProps;
-       }
-       
-       @Override
-       public LocalProperties computeLocalProperties(LocalProperties lProps) {
-               return lProps.clearUniqueFieldSets();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index aaabac5..fc5eb21 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -418,7 +418,6 @@ public class PlanJSONDumpGenerator {
                                locString = "No-Op";
                                break;
                                
-                       case COLLECTOR_MAP:
                        case MAP:
                                locString = "Map";
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
index d5ddf4d..1125c29 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
@@ -57,7 +57,6 @@ public class JsonMapper {
                        case UNARY_NO_OP:
                                return "No-Op";
 
-                       case COLLECTOR_MAP:
                        case MAP:
                                return "Map";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index bcdee14..3f3eae1 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -47,7 +47,6 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
 import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
 import org.apache.flink.optimizer.dag.CoGroupNode;
 import org.apache.flink.optimizer.dag.CoGroupRawNode;
-import org.apache.flink.optimizer.dag.CollectorMapNode;
 import org.apache.flink.optimizer.dag.CrossNode;
 import org.apache.flink.optimizer.dag.DagConnection;
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -144,9 +143,6 @@ public class GraphCreatingVisitor implements 
Visitor<Operator<?>> {
                else if (c instanceof MapPartitionOperatorBase) {
                        n = new MapPartitionNode((MapPartitionOperatorBase<?, 
?, ?>) c);
                }
-               else if (c instanceof 
org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
-                       n = new 
CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?,
 ?, ?>) c);
-               }
                else if (c instanceof FlatMapOperatorBase) {
                        n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
deleted file mode 100644
index 60bc798..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Map task which is executed by a Task Manager. The task has a single
- * input and one or multiple outputs. It is provided with a MapFunction
- * implementation.
- * <p>
- * The MapTask creates an iterator over all key-value pairs of its input and 
hands that to the <code>map()</code> method
- * of the MapFunction.
- * 
- * @see GenericCollectorMap
- * 
- * @param <IT> The mapper's input data type.
- * @param <OT> The mapper's output data type.
- */
-@SuppressWarnings("deprecation")
-public class CollectorMapDriver<IT, OT> implements 
Driver<GenericCollectorMap<IT, OT>, OT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(CollectorMapDriver.class);
-
-
-       private TaskContext<GenericCollectorMap<IT, OT>, OT> taskContext;
-       
-       private volatile boolean running;
-
-       private boolean objectReuseEnabled = false;
-
-       @Override
-       public void setup(TaskContext<GenericCollectorMap<IT, OT>, OT> context) 
{
-               this.taskContext = context;
-               this.running = true;
-       }
-
-       @Override
-       public int getNumberOfInputs() {
-               return 1;
-       }
-
-       @Override
-       public Class<GenericCollectorMap<IT, OT>> getStubType() {
-               @SuppressWarnings("unchecked")
-               final Class<GenericCollectorMap<IT, OT>> clazz = 
(Class<GenericCollectorMap<IT, OT>>) (Class<?>) GenericCollectorMap.class;
-               return clazz;
-       }
-
-       @Override
-       public int getNumberOfDriverComparators() {
-               return 0;
-       }
-
-       @Override
-       public void prepare() {
-               ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
-               this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("CollectorMapDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
-               }
-       }
-
-       @Override
-       public void run() throws Exception {
-               // cache references on the stack
-               final MutableObjectIterator<IT> input = 
this.taskContext.getInput(0);
-               final GenericCollectorMap<IT, OT> stub = 
this.taskContext.getStub();
-               final Collector<OT> output = 
this.taskContext.getOutputCollector();
-
-               if (objectReuseEnabled) {
-                       IT record = 
this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance();
-
-
-                       while (this.running && ((record = input.next(record)) 
!= null)) {
-                               stub.map(record, output);
-                       }
-               } else {
-                       IT record;
-                       while (this.running && ((record = input.next()) != 
null)) {
-                               stub.map(record, output);
-                       }
-               }
-       }
-
-       @Override
-       public void cleanup() {
-               // mappers need no cleanup, since no strategies are used.
-       }
-
-       @Override
-       public void cancel() {
-               this.running = false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index b069f12..12da126 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -23,7 +23,6 @@ import static 
org.apache.flink.runtime.operators.DamBehavior.MATERIALIZING;
 import static org.apache.flink.runtime.operators.DamBehavior.PIPELINED;
 import org.apache.flink.runtime.operators.chaining.ChainedAllReduceDriver;
 
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver;
 import org.apache.flink.runtime.operators.chaining.ChainedMapDriver;
@@ -40,8 +39,6 @@ public enum DriverStrategy {
        // a binary no-op operator. non implementation available
        BINARY_NO_OP(null, null, PIPELINED, PIPELINED, 0),
 
-       // the old mapper
-       COLLECTOR_MAP(CollectorMapDriver.class, 
ChainedCollectorMapDriver.class, PIPELINED, 0),
        // the proper mapper
        MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0),
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
deleted file mode 100644
index 8900ed7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.chaining;
-
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.BatchTask;
-
-@SuppressWarnings("deprecation")
-public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
-
-       private GenericCollectorMap<IT, OT> mapper;
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void setup(AbstractInvokable parent) {
-               @SuppressWarnings("unchecked")
-               final GenericCollectorMap<IT, OT> mapper =
-                       BatchTask.instantiateUserCode(this.config, 
userCodeClassLoader, GenericCollectorMap.class);
-               this.mapper = mapper;
-               mapper.setRuntimeContext(getUdfRuntimeContext());
-       }
-
-       @Override
-       public void openTask() throws Exception {
-               Configuration stubConfig = this.config.getStubParameters();
-               BatchTask.openUserCode(this.mapper, stubConfig);
-       }
-
-       @Override
-       public void closeTask() throws Exception {
-               BatchTask.closeUserCode(this.mapper);
-       }
-
-       @Override
-       public void cancelTask() {
-               try {
-                       this.mapper.close();
-               } catch (Throwable t) {
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public RichFunction getStub() {
-               return this.mapper;
-       }
-
-       public String getTaskName() {
-               return this.taskName;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void collect(IT record) {
-               try {
-                       this.mapper.map(record, this.outputCollector);
-               } catch (Exception ex) {
-                       throw new 
ExceptionInChainedStubException(this.taskName, ex);
-               }
-       }
-
-       @Override
-       public void close() {
-               this.outputCollector.close();
-       }
-}

Reply via email to