http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
deleted file mode 100644
index 6c4659d..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import 
org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import 
org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, 
Record, Record>> {
-       
-       private static final long HASH_MEM = 6*1024*1024;
-       
-       private static final long SORT_MEM = 3*1024*1024;
-
-       private static final int NUM_SORTER = 2;
-       
-       private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
-       private final double bnljn_frac;
-
-       private final double hash_frac;
-       
-       @SuppressWarnings("unchecked")
-       private final RecordComparator comparator1 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
-       
-       @SuppressWarnings("unchecked")
-       private final RecordComparator comparator2 = new RecordComparator(
-               new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ 
IntValue.class });
-       
-       private final List<Record> outList = new ArrayList<Record>();
-       
-       
-       public MatchTaskTest(ExecutionConfig config) {
-               super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
-               bnljn_frac = 
(double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
-               hash_frac = 
(double)HASH_MEM/this.getMemoryManager().getMemorySize();
-       }
-       
-       
-       @Test
-       public void testSortBoth1MatchTask() {
-               final int keyCnt1 = 20;
-               final int valCnt1 = 1;
-               
-               final int keyCnt2 = 10;
-               final int valCnt2 = 2;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
-                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               Assert.assertTrue("Resultset size was " + this.outList.size() + 
". Expected was " + expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-       }
-       
-       @Test
-       public void testSortBoth2MatchTask() {
-
-               int keyCnt1 = 20;
-               int valCnt1 = 1;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 1;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
-                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-               
-       }
-       
-       @Test
-       public void testSortBoth3MatchTask() {
-
-               int keyCnt1 = 20;
-               int valCnt1 = 1;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
-                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-               
-       }
-       
-       @Test
-       public void testSortBoth4MatchTask() {
-
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 1;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
-                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-               
-       }
-       
-       @Test
-       public void testSortBoth5MatchTask() {
-
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
-                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-               
-       }
-       
-       @Test
-       public void testSortFirstMatchTask() {
-
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInputSorted(new UniformRecordGenerator(keyCnt1, 
valCnt1, false), this.comparator1.duplicate());
-                       addInput(new UniformRecordGenerator(keyCnt2, valCnt2, 
true));
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-               
-       }
-       
-       @Test
-       public void testSortSecondMatchTask() {
-
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       addInput(new UniformRecordGenerator(keyCnt1, valCnt1, 
true));
-                       addInputSorted(new UniformRecordGenerator(keyCnt2, 
valCnt2, false), this.comparator2.duplicate());
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-               
-       }
-       
-       @Test
-       public void testMergeMatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               setOutput(this.outList);
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-               
-               int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               
-               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+expCnt, this.outList.size() == expCnt);
-               
-               this.outList.clear();
-               
-       }
-       
-       @Test
-       public void testFailingMatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               setOutput(new NirvanaOutputList());
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-               getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-               setNumFileHandlesForSort(4);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-               
-               try {
-                       testDriver(testTask, MockFailingMatchStub.class);
-                       Assert.fail("Driver did not forward Exception.");
-               } catch (ExpectedTestException e) {
-                       // good!
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("The test caused an exception.");
-               }
-       }
-       
-       @Test
-       public void testCancelMatchTaskWhileSort1() {
-               final int keyCnt = 20;
-               final int valCnt = 20;
-               
-               try {
-                       setOutput(new NirvanaOutputList());
-                       addDriverComparator(this.comparator1);
-                       addDriverComparator(this.comparator2);
-                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-                       setNumFileHandlesForSort(4);
-                       
-                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-                       
-                       try {
-                               addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-                               addInput(new UniformRecordGenerator(keyCnt, 
valCnt, true));
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                               Assert.fail("The test caused an exception.");
-                       }
-       
-                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileSort1()") {
-                               @Override
-                               public void run() {
-                                       try {
-                                               testDriver(testTask, 
MockMatchStub.class);
-                                       }
-                                       catch (Throwable t) {
-                                               error.set(t);
-                                       }
-                               }
-                       };
-                       taskRunner.start();
-
-                       Thread.sleep(1000);
-
-                       cancel();
-                       taskRunner.interrupt();
-
-                       taskRunner.join(60000);
-
-                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
-
-                       Throwable taskError = error.get();
-                       if (taskError != null) {
-                               taskError.printStackTrace();
-                               fail("Error in task while canceling: " + 
taskError.getMessage());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCancelMatchTaskWhileSort2() {
-               final int keyCnt = 20;
-               final int valCnt = 20;
-               
-               try {
-                       setOutput(new NirvanaOutputList());
-                       addDriverComparator(this.comparator1);
-                       addDriverComparator(this.comparator2);
-                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-                       setNumFileHandlesForSort(4);
-                       
-                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-                       
-                       try {
-                               addInput(new UniformRecordGenerator(keyCnt, 
valCnt, true));
-                               addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-                       } catch (Exception e) {
-                               e.printStackTrace();
-                               Assert.fail("The test caused an exception.");
-                       }
-
-                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileSort2()") {
-                               @Override
-                               public void run() {
-                                       try {
-                                               testDriver(testTask, 
MockMatchStub.class);
-                                       }
-                                       catch (Throwable t) {
-                                               error.set(t);
-                                       }
-                               }
-                       };
-                       taskRunner.start();
-
-                       Thread.sleep(1000);
-
-                       cancel();
-                       taskRunner.interrupt();
-
-                       taskRunner.join(60000);
-
-                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
-
-                       Throwable taskError = error.get();
-                       if (taskError != null) {
-                               taskError.printStackTrace();
-                               fail("Error in task while canceling: " + 
taskError.getMessage());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testCancelMatchTaskWhileMatching() {
-               final int keyCnt = 20;
-               final int valCnt = 20;
-               
-               try {
-                       setOutput(new NirvanaOutputList());
-                       addDriverComparator(this.comparator1);
-                       addDriverComparator(this.comparator2);
-                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-                       
getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-                       getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-                       setNumFileHandlesForSort(4);
-                       
-                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-                       
-                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
-                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
true));
-                       
-                       final AtomicReference<Throwable> error = new 
AtomicReference<>();
-                       
-                       Thread taskRunner = new Thread("Task runner for 
testCancelMatchTaskWhileMatching()") {
-                               @Override
-                               public void run() {
-                                       try {
-                                               testDriver(testTask, 
MockDelayingMatchStub.class);
-                                       }
-                                       catch (Throwable t) {
-                                               error.set(t);
-                                       }
-                               }
-                       };
-                       taskRunner.start();
-                       
-                       Thread.sleep(1000);
-                       
-                       cancel();
-                       taskRunner.interrupt();
-                       
-                       taskRunner.join(60000);
-                       
-                       assertFalse("Task thread did not finish within 60 
seconds", taskRunner.isAlive());
-                       
-                       Throwable taskError = error.get();
-                       if (taskError != null) {
-                               taskError.printStackTrace();
-                               fail("Error in task while canceling: " + 
taskError.getMessage());
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testHash1MatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 1;
-               
-               int keyCnt2 = 10;
-               int valCnt2 = 2;
-                               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(this.outList);
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
-               this.outList.clear();
-       }
-       
-       @Test
-       public void testHash2MatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 1;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 1;
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(this.outList);
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
-               this.outList.clear();
-       }
-       
-       @Test
-       public void testHash3MatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 1;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(this.outList);
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
-               this.outList.clear();
-       }
-       
-       @Test
-       public void testHash4MatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 1;
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(this.outList);
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
-               this.outList.clear();
-       }
-       
-       @Test
-       public void testHash5MatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(this.outList);
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockMatchStub.class);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-               
-               final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-               Assert.assertEquals("Wrong result set size.", expCnt, 
this.outList.size());
-               this.outList.clear();
-       }
-       
-       @Test
-       public void testFailingHashFirstMatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(new NirvanaOutputList());
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockFailingMatchStub.class);
-                       Assert.fail("Function exception was not forwarded.");
-               } catch (ExpectedTestException etex) {
-                       // good!
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-       }
-       
-       @Test
-       public void testFailingHashSecondMatchTask() {
-               int keyCnt1 = 20;
-               int valCnt1 = 20;
-               
-               int keyCnt2 = 20;
-               int valCnt2 = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-               addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(new NirvanaOutputList());
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               try {
-                       testDriver(testTask, MockFailingMatchStub.class);
-                       Assert.fail("Function exception was not forwarded.");
-               } catch (ExpectedTestException etex) {
-                       // good!
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Test caused an exception.");
-               }
-       }
-       
-       @Test
-       public void testCancelHashMatchTaskWhileBuildFirst() {
-               final int keyCnt = 20;
-               final int valCnt = 20;
-
-               try {
-                       addInput(new DelayingInfinitiveInputIterator(100));
-                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
false));
-
-                       addDriverComparator(this.comparator1);
-                       addDriverComparator(this.comparator2);
-
-                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
-                       setOutput(new NirvanaOutputList());
-
-                       
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-                       getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
-                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-
-                       final AtomicBoolean success = new AtomicBoolean(false);
-
-                       Thread taskRunner = new Thread() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               testDriver(testTask, 
MockMatchStub.class);
-                                               success.set(true);
-                                       } catch (Exception ie) {
-                                               ie.printStackTrace();
-                                       }
-                               }
-                       };
-                       taskRunner.start();
-
-                       Thread.sleep(1000);
-                       cancel();
-
-                       try {
-                               taskRunner.join();
-                       }
-                       catch (InterruptedException ie) {
-                               Assert.fail("Joining threads failed");
-                       }
-
-                       Assert.assertTrue("Test threw an exception even though 
it was properly canceled.", success.get());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testHashCancelMatchTaskWhileBuildSecond() {
-               final int keyCnt = 20;
-               final int valCnt = 20;
-
-               try {
-                       addInput(new UniformRecordGenerator(keyCnt, valCnt, 
false));
-                       addInput(new DelayingInfinitiveInputIterator(100));
-
-                       addDriverComparator(this.comparator1);
-                       addDriverComparator(this.comparator2);
-
-                       
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
-                       setOutput(new NirvanaOutputList());
-
-                       
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-                       getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
-                       final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-
-                       final AtomicBoolean success = new AtomicBoolean(false);
-
-                       Thread taskRunner = new Thread() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               testDriver(testTask, 
MockMatchStub.class);
-                                               success.set(true);
-                                       } catch (Exception ie) {
-                                               ie.printStackTrace();
-                                       }
-                               }
-                       };
-                       taskRunner.start();
-
-                       Thread.sleep(1000);
-                       cancel();
-
-                       try {
-                               taskRunner.join();
-                       }
-                       catch (InterruptedException ie) {
-                               Assert.fail("Joining threads failed");
-                       }
-
-                       Assert.assertTrue("Test threw an exception even though 
it was properly canceled.", success.get());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testHashFirstCancelMatchTaskWhileMatching() {
-               int keyCnt = 20;
-               int valCnt = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(new NirvanaOutputList());
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               final AtomicBoolean success = new AtomicBoolean(false);
-               
-               Thread taskRunner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       testDriver(testTask, 
MockMatchStub.class);
-                                       success.set(true);
-                               } catch (Exception ie) {
-                                       ie.printStackTrace();
-                               }
-                       }
-               };
-               taskRunner.start();
-               
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
-               tct.start();
-               
-               try {
-                       tct.join();
-                       taskRunner.join();
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
-               }
-               
-               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
-       }
-       
-       @Test
-       public void testHashSecondCancelMatchTaskWhileMatching() {
-               int keyCnt = 20;
-               int valCnt = 20;
-               
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-               addDriverComparator(this.comparator1);
-               addDriverComparator(this.comparator2);
-               
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-               setOutput(new NirvanaOutputList());
-               
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-               getTaskConfig().setRelativeMemoryDriver(hash_frac);
-               
-               final JoinDriver<Record, Record, Record> testTask = new 
JoinDriver<Record, Record, Record>();
-               
-               final AtomicBoolean success = new AtomicBoolean(false);
-               
-               Thread taskRunner = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       testDriver(testTask, 
MockMatchStub.class);
-                                       success.set(true);
-                               } catch (Exception ie) {
-                                       ie.printStackTrace();
-                               }
-                       }
-               };
-               taskRunner.start();
-               
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
this);
-               tct.start();
-               
-               try {
-                       tct.join();
-                       taskRunner.join();
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
-               }
-               
-               Assert.assertTrue("Test threw an exception even though it was 
properly canceled.", success.get());
-       }
-       
-       // 
=================================================================================================
-       
-       public static final class MockMatchStub extends JoinFunction {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void join(Record record1, Record record2, 
Collector<Record> out) throws Exception {
-                       out.collect(record1);
-               }
-       }
-       
-       public static final class MockFailingMatchStub extends JoinFunction {
-               private static final long serialVersionUID = 1L;
-               
-               private int cnt = 0;
-               
-               @Override
-               public void join(Record record1, Record record2, 
Collector<Record> out) throws Exception {
-                       if (++this.cnt >= 10) {
-                               throw new ExpectedTestException();
-                       }
-                       out.collect(record1);
-               }
-       }
-       
-       public static final class MockDelayingMatchStub extends JoinFunction {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void join(Record record1, Record record2, 
Collector<Record> out) throws Exception {
-                       try {
-                               Thread.sleep(100);
-                       } catch (InterruptedException e) {
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index f59c4a3..415b6bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -26,9 +26,9 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -46,7 +46,7 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
        private final RecordComparator comparator = new RecordComparator(
                new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
        
-       private final List<Record> outList = new ArrayList<Record>();
+       private final List<Record> outList = new ArrayList<>();
        
        
        public ReduceTaskExternalITCase(ExecutionConfig config) {
@@ -68,7 +68,7 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
                try {
                        addInputSorted(new UniformRecordGenerator(keyCnt, 
valCnt, false), this.comparator.duplicate());
                        
-                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                        
                        testDriver(testTask, MockReduceStub.class);
                } catch (Exception e) {
@@ -100,7 +100,7 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
                try {
                        addInputSorted(new UniformRecordGenerator(keyCnt, 
valCnt, false), this.comparator.duplicate());
                        
-                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                        
                        testDriver(testTask, MockReduceStub.class);
                } catch (Exception e) {
@@ -130,14 +130,14 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
                
                CombiningUnilateralSortMerger<Record> sorter = null;
                try {
-                       sorter = new CombiningUnilateralSortMerger<Record>(new 
MockCombiningReduceStub(), 
+                       sorter = new CombiningUnilateralSortMerger<>(new 
MockCombiningReduceStub(),
                                getMemoryManager(), getIOManager(), new 
UniformRecordGenerator(keyCnt, valCnt, false), 
                                getOwningNepheleTask(), 
RecordSerializerFactory.get(), this.comparator.duplicate(),
                                        this.perSortFractionMem,
                                        2, 0.8f, true);
                        addInput(sorter.getIterator());
                        
-                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                
                        testDriver(testTask, MockCombiningReduceStub.class);
                } catch (Exception e) {
@@ -176,14 +176,14 @@ public class ReduceTaskExternalITCase extends 
DriverTestBase<RichGroupReduceFunc
                
                CombiningUnilateralSortMerger<Record> sorter = null;
                try {
-                       sorter = new CombiningUnilateralSortMerger<Record>(new 
MockCombiningReduceStub(), 
+                       sorter = new CombiningUnilateralSortMerger<>(new 
MockCombiningReduceStub(),
                                getMemoryManager(), getIOManager(), new 
UniformRecordGenerator(keyCnt, valCnt, false), 
                                getOwningNepheleTask(), 
RecordSerializerFactory.get(), this.comparator.duplicate(),
                                        this.perSortFractionMem,
                                        2, 0.8f, false);
                        addInput(sorter.getIterator());
                        
-                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                
                        testDriver(testTask, MockCombiningReduceStub.class);
                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index cc25c99..8bc7fe5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -27,9 +27,9 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import 
org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
@@ -51,7 +52,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
        private final RecordComparator comparator = new RecordComparator(
                new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ 
IntValue.class });
        
-       private final List<Record> outList = new ArrayList<Record>();
+       private final List<Record> outList = new ArrayList<>();
 
        public ReduceTaskTest(ExecutionConfig config) {
                super(config, 0, 1, 3*1024*1024);
@@ -69,7 +70,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                try {
                        addInputSorted(new UniformRecordGenerator(keyCnt, 
valCnt, false), this.comparator.duplicate());
                        
-                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                        
                        testDriver(testTask, MockReduceStub.class);
                } catch (Exception e) {
@@ -96,7 +97,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                setOutput(this.outList);
                
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
                
-               GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+               GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                
                try {
                        testDriver(testTask, MockReduceStub.class);
@@ -125,13 +126,13 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                
                CombiningUnilateralSortMerger<Record> sorter = null;
                try {
-                       sorter = new CombiningUnilateralSortMerger<Record>(new 
MockCombiningReduceStub(), 
+                       sorter = new CombiningUnilateralSortMerger<>(new 
MockCombiningReduceStub(),
                                getMemoryManager(), getIOManager(), new 
UniformRecordGenerator(keyCnt, valCnt, false), 
                                getOwningNepheleTask(), 
RecordSerializerFactory.get(), this.comparator.duplicate(), 
this.perSortFractionMem,
                                        4, 0.8f, true);
                        addInput(sorter.getIterator());
                        
-                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+                       GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                
                        testDriver(testTask, MockCombiningReduceStub.class);
                } catch (Exception e) {
@@ -168,7 +169,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                setOutput(this.outList);
                
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
                
-               GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+               GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                
                try {
                        testDriver(testTask, MockFailingReduceStub.class);
@@ -190,7 +191,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                setOutput(new NirvanaOutputList());
                
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
                
-               final GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+               final GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                
                try {
                        addInputSorted(new 
DelayingInfinitiveInputIterator(100), this.comparator.duplicate());
@@ -238,7 +239,7 @@ public class ReduceTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Recor
                setOutput(new NirvanaOutputList());
                
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
                
-               final GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<Record, Record>();
+               final GroupReduceDriver<Record, Record> testTask = new 
GroupReduceDriver<>();
                
                final AtomicBoolean success = new AtomicBoolean(false);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 1f19699..542812c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -21,17 +21,17 @@ package org.apache.flink.runtime.operators.chaining;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericCollectorMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.FlatMapTaskTest.MockMapStub;
 import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -49,14 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
-@SuppressWarnings("deprecation")
 public class ChainTaskTest extends TaskTestBase {
        
        private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
 
        private static final int NETWORK_BUFFER_SIZE = 1024;
        
-       private final List<Record> outList = new ArrayList<Record>();
+       private final List<Record> outList = new ArrayList<>();
        
        @SuppressWarnings("unchecked")
        private final RecordComparatorFactory compFact = new 
RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}, new 
boolean[] {true});
@@ -95,16 +94,16 @@ public class ChainTaskTest extends TaskTestBase {
                                
combineConfig.setRelativeMemoryDriver(memoryFraction);
                                
                                // udf
-                               combineConfig.setStubWrapper(new 
UserCodeClassWrapper<MockReduceStub>(MockReduceStub.class));
+                               combineConfig.setStubWrapper(new 
UserCodeClassWrapper<>(MockReduceStub.class));
                                
                                
getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, 
combineConfig, "combine");
                        }
                        
                        // chained map+combine
                        {
-                               BatchTask<GenericCollectorMap<Record, Record>, 
Record> testTask =
-                                                                               
        new BatchTask<GenericCollectorMap<Record, Record>, Record>();
-                               registerTask(testTask, 
CollectorMapDriver.class, MockMapStub.class);
+                               BatchTask<FlatMapFunction<Record, Record>, 
Record> testTask =
+                                                                               
        new BatchTask<>();
+                               registerTask(testTask, FlatMapDriver.class, 
MockMapStub.class);
                                
                                try {
                                        testTask.invoke();
@@ -156,17 +155,17 @@ public class ChainTaskTest extends TaskTestBase {
                                
combineConfig.setRelativeMemoryDriver(memoryFraction);
                                
                                // udf
-                               combineConfig.setStubWrapper(new 
UserCodeClassWrapper<MockFailingCombineStub>(MockFailingCombineStub.class));
+                               combineConfig.setStubWrapper(new 
UserCodeClassWrapper<>(MockFailingCombineStub.class));
                                
                                
getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, 
combineConfig, "combine");
                        }
                        
                        // chained map+combine
                        {
-                               final BatchTask<GenericCollectorMap<Record, 
Record>, Record> testTask =
-                                                                               
        new BatchTask<GenericCollectorMap<Record, Record>, Record>();
+                               final BatchTask<FlatMapFunction<Record, 
Record>, Record> testTask =
+                                                                               
        new BatchTask<>();
                                
-                               super.registerTask(testTask, 
CollectorMapDriver.class, MockMapStub.class);
+                               super.registerTask(testTask, 
FlatMapDriver.class, MockMapStub.class);
        
                                boolean stubFailed = false;
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 777bfc8..63f54ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
@@ -94,7 +94,7 @@ public abstract class TaskTestBase extends TestLogger {
                
                final TaskConfig config = new 
TaskConfig(this.mockEnv.getTaskConfiguration());
                config.setDriver(driver);
-               config.setStubWrapper(new 
UserCodeClassWrapper<RichFunction>(stubClass));
+               config.setStubWrapper(new UserCodeClassWrapper<>(stubClass));
                
                task.setEnvironment(this.mockEnv);
 
@@ -116,17 +116,17 @@ public abstract class TaskTestBase extends TestLogger {
                }
        }
 
-       public void registerFileOutputTask(AbstractInvokable outTask, Class<? 
extends FileOutputFormat> stubClass, String outPath) {
+       public void registerFileOutputTask(AbstractInvokable outTask, Class<? 
extends FileOutputFormat<Record>> stubClass, String outPath) {
                registerFileOutputTask(outTask, 
InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath);
        }
        
-       public void registerFileOutputTask(AbstractInvokable outTask, 
FileOutputFormat outputFormat, String outPath) {
+       public void registerFileOutputTask(AbstractInvokable outTask, 
FileOutputFormat<Record> outputFormat, String outPath) {
                TaskConfig dsConfig = new 
TaskConfig(this.mockEnv.getTaskConfiguration());
                
                outputFormat.setOutputFilePath(new Path(outPath));
                outputFormat.setWriteMode(WriteMode.OVERWRITE);
 
-               dsConfig.setStubWrapper(new 
UserCodeObjectWrapper<FileOutputFormat>(outputFormat));
+               dsConfig.setStubWrapper(new 
UserCodeObjectWrapper<>(outputFormat));
 
                outTask.setEnvironment(this.mockEnv);
 
@@ -139,9 +139,9 @@ public abstract class TaskTestBase extends TestLogger {
        }
 
        public void registerFileInputTask(AbstractInvokable inTask,
-                       Class<? extends DelimitedInputFormat> stubClass, String 
inPath, String delimiter)
+                       Class<? extends DelimitedInputFormat<Record>> 
stubClass, String inPath, String delimiter)
        {
-               DelimitedInputFormat format;
+               DelimitedInputFormat<Record> format;
                try {
                        format = stubClass.newInstance();
                }
@@ -153,7 +153,7 @@ public abstract class TaskTestBase extends TestLogger {
                format.setDelimiter(delimiter);
                
                TaskConfig dsConfig = new 
TaskConfig(this.mockEnv.getTaskConfiguration());
-               dsConfig.setStubWrapper(new 
UserCodeObjectWrapper<DelimitedInputFormat>(format));
+               dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(format));
                
                this.inputSplitProvider.addInputSplits(inPath, 5);
 

Reply via email to