http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
deleted file mode 100644
index 3ce021b..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationTerminationWithTwoTails extends RecordAPITestBase {
-
-       private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + 
"5\n";
-       private static final String EXPECTED = "22\n";
-
-       protected String dataPath;
-       protected String resultPath;
-
-       public IterationTerminationWithTwoTails(){
-               setTaskManagerNumSlots(parallelism);
-       }
-
-       @Override
-       protected void preSubmit() throws Exception {
-               dataPath = createTempFile("datapoints.txt", INPUT);
-               resultPath = getTempFilePath("result");
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED, resultPath);
-       }
-
-       @Override
-       protected Plan getTestJob() {
-               return getTestPlanPlan(parallelism, dataPath, resultPath);
-       }
-       
-       private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
-
-               FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-               
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(initialInput);
-               iteration.setMaximumNumberOfIterations(5);
-               Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
-
-               ReduceOperator sumReduce = ReduceOperator.builder(new 
SumReducer())
-                               .input(iteration.getPartialSolution())
-                               .name("Compute sum (Reduce)")
-                               .build();
-               
-               iteration.setNextPartialSolution(sumReduce);
-               
-               MapOperator terminationMapper = MapOperator.builder(new 
TerminationMapper())
-                               .input(iteration.getPartialSolution())
-                               .name("Compute termination criterion (Map)")
-                               .build();
-               
-               iteration.setTerminationCriterion(terminationMapper);
-
-               FileDataSink finalResult = new 
FileDataSink(CsvOutputFormat.class, output, iteration, "Output");
-               CsvOutputFormat.configureRecordFormat(finalResult)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter(' ')
-                       .field(StringValue.class, 0);
-
-               Plan plan = new Plan(finalResult, "Iteration with AllReducer 
(keyless Reducer)");
-               plan.setDefaultParallelism(4);
-               Assert.assertTrue(plan.getDefaultParallelism() > 1);
-               return plan;
-       }
-       
-       static final class SumReducer extends ReduceFunction implements 
Serializable {
-               
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void reduce(Iterator<Record> it, Collector<Record> out) {
-                       // Compute the sum
-                       int sum = 0;
-                       
-                       while (it.hasNext()) {
-                               sum += Integer.parseInt(it.next().getField(0, 
StringValue.class).getValue()) + 1;
-                       }
-                       
-                       out.collect(new Record(new 
StringValue(Integer.toString(sum))));
-               }
-       }
-       
-       public static class TerminationMapper extends MapFunction implements 
Serializable {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void map(Record record, Collector<Record> collector) {
-                       
-                       int currentSum = Integer.parseInt(record.getField(0, 
StringValue.class).getValue());
-                       
-                       if(currentSum < 21)
-                               collector.collect(record);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
index cb16c15..ab66f31 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
@@ -18,91 +18,34 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationWithAllReducerITCase extends RecordAPITestBase {
-
-       private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" + "1\n";
+import java.util.List;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class IterationWithAllReducerITCase extends JavaProgramTestBase {
        private static final String EXPECTED = "1\n";
 
-       protected String dataPath;
-       protected String resultPath;
-
-       public IterationWithAllReducerITCase(){
-               setTaskManagerNumSlots(4);
-       }
-
        @Override
-       protected void preSubmit() throws Exception {
-               dataPath = createTempFile("datapoints.txt", INPUT);
-               resultPath = getTempFilePath("result");
-       }
-       
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(EXPECTED, resultPath);
-       }
-
-       @Override
-       protected Plan getTestJob() {
-               Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
-               return plan;
-       }
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
 
-       
-       private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
+               DataSet<String> initialInput = env.fromElements("1", "1", "1", 
"1", "1", "1", "1", "1");
 
-               FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
-               
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(initialInput);
-               iteration.setMaximumNumberOfIterations(5);
-               
-               Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
+               IterativeDataSet<String> iteration = 
initialInput.iterate(5).name("Loop");
 
-               ReduceOperator sumReduce = ReduceOperator.builder(new 
PickOneReducer())
-                               .input(iteration.getPartialSolution())
-                               .name("Compute sum (Reduce)")
-                               .build();
-               
-               iteration.setNextPartialSolution(sumReduce);
+               DataSet<String> sumReduce = iteration.reduce(new 
ReduceFunction<String>(){
+                       @Override
+                       public String reduce(String value1, String value2) 
throws Exception {
+                               return value1;
+                       }
+               }).name("Compute sum (Reduce)");
 
-               FileDataSink finalResult = new 
FileDataSink(CsvOutputFormat.class, output, iteration, "Output");
-               CsvOutputFormat.configureRecordFormat(finalResult)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter(' ')
-                       .field(StringValue.class, 0);
+               List<String> result = iteration.closeWith(sumReduce).collect();
 
-               Plan plan = new Plan(finalResult, "Iteration with AllReducer 
(keyless Reducer)");
-               
-               plan.setDefaultParallelism(numSubTasks);
-               Assert.assertTrue(plan.getDefaultParallelism() > 1);
-               
-               return plan;
-       }
-       
-       public static final class PickOneReducer extends ReduceFunction 
implements Serializable {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void reduce(Iterator<Record> it, Collector<Record> out) {
-                       out.collect(it.next());
-               }
+               compareResultAsText(result, EXPECTED);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
index c11c9ea..c283df1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java
@@ -18,43 +18,25 @@
 
 package org.apache.flink.test.iterative;
 
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
+import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class IterationWithChainingITCase extends RecordAPITestBase {
+public class IterationWithChainingITCase extends JavaProgramTestBase {
 
        private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + 
"1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n";
 
        private String dataPath;
        private String resultPath;
 
-       public IterationWithChainingITCase(Configuration config) {
-               super(config);
-               setTaskManagerNumSlots(parallelism);
-       }
-
        @Override
        protected void preSubmit() throws Exception {
                dataPath = createTempFile("data_points.txt", DATA_POINTS);
@@ -62,63 +44,35 @@ public class IterationWithChainingITCase extends 
RecordAPITestBase {
        }
 
        @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(DATA_POINTS, resultPath);
-       }
-
-       @Override
-       protected Plan getTestJob() {
-               return 
getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, 
resultPath);
-       }
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
 
-       @Parameters
-       public static Collection<Object[]> getConfigurations() {
-               Configuration config1 = new Configuration();
-               config1.setInteger("ChainedMapperITCase#NoSubtasks", 
parallelism);
-               return toParameterList(config1);
-       }
+               DataSet<Tuple2<Integer, CoordVector>> initialInput
+                               = env.readFile(new PointInFormat(), 
dataPath).setParallelism(1).name("Input");
 
-       public static final class IdentityMapper extends MapFunction implements 
Serializable {
+               IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = 
initialInput.iterate(2).name("Loop");
 
-               private static final long serialVersionUID = 1L;
+               DataSet<Tuple2<Integer, CoordVector>> identity
+                               = iteration.groupBy(0).reduceGroup(new 
GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, 
CoordVector>>() {
+                                       @Override
+                                       public void 
reduce(Iterable<Tuple2<Integer, CoordVector>> values, Collector<Tuple2<Integer, 
CoordVector>> out) throws Exception {
+                                               for (Tuple2<Integer, 
CoordVector> value : values) {
+                                                       out.collect(value);
+                                               }
+                                       }
+                               }).map(new MapFunction<Tuple2<Integer, 
CoordVector>, Tuple2<Integer, CoordVector>>() {
+                                       @Override
+                                       public Tuple2<Integer, CoordVector> 
map(Tuple2<Integer, CoordVector> value) throws Exception {
+                                               return value;
+                                       }
 
-               @Override
-               public void map(Record rec, Collector<Record> out) {
-                       out.collect(rec);
-               }
-       }
+                               });
 
-       public static final class DummyReducer extends ReduceFunction 
implements Serializable {
+               iteration.closeWith(identity).writeAsFormattedText(resultPath, 
new PointFormatter());
 
-               private static final long serialVersionUID = 1L;
+               env.execute("Iteration with chained map test");
 
-               @Override
-               public void reduce(Iterator<Record> it, Collector<Record> out) {
-                       while (it.hasNext()) {
-                               out.collect(it.next());
-                       }
-               }
-       }
-
-       static Plan getTestPlan(int numSubTasks, String input, String output) {
-
-               FileDataSource initialInput = new FileDataSource(new 
PointInFormat(), input, "Input");
-               initialInput.setParallelism(1);
-
-               BulkIteration iteration = new BulkIteration("Loop");
-               iteration.setInput(initialInput);
-               iteration.setMaximumNumberOfIterations(2);
-
-               ReduceOperator dummyReduce = ReduceOperator.builder(new 
DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution())
-                               .name("Reduce something").build();
-
-               MapOperator dummyMap = MapOperator.builder(new 
IdentityMapper()).input(dummyReduce).build();
-               iteration.setNextPartialSolution(dummyMap);
-
-               FileDataSink finalResult = new FileDataSink(new 
PointOutFormat(), output, iteration, "Output");
-
-               Plan plan = new Plan(finalResult, "Iteration with chained map 
test");
-               plan.setDefaultParallelism(numSubTasks);
-               return plan;
+               compareResultsByLinesInMemory(DATA_POINTS, resultPath);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
index 2a4a4b7..8756429 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java
@@ -25,10 +25,11 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.CoordVector;
+import org.apache.flink.test.util.PointFormatter;
+import org.apache.flink.test.util.PointInFormat;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 
 public class IterationWithUnionITCase extends JavaProgramTestBase {
@@ -54,32 +55,32 @@ public class IterationWithUnionITCase extends 
JavaProgramTestBase {
        protected void testProgram() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                
-               DataSet<Record> initialInput = env.readFile(new 
PointInFormat(), this.dataPath).setParallelism(1);
+               DataSet<Tuple2<Integer, CoordVector>> initialInput = 
env.readFile(new PointInFormat(), this.dataPath).setParallelism(1);
                
-               IterativeDataSet<Record> iteration = initialInput.iterate(2);
+               IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = 
initialInput.iterate(2);
                
-               DataSet<Record> result = iteration.union(iteration).map(new 
IdentityMapper());
+               DataSet<Tuple2<Integer, CoordVector>> result = 
iteration.union(iteration).map(new IdentityMapper());
                
-               iteration.closeWith(result).write(new PointOutFormat(), 
this.resultPath);
+               
iteration.closeWith(result).writeAsFormattedText(this.resultPath, new 
PointFormatter());
                
                env.execute();
        }
        
-       static final class IdentityMapper implements MapFunction<Record, 
Record>, Serializable {
+       static final class IdentityMapper implements 
MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, 
Serializable {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public Record map(Record rec) {
+               public Tuple2<Integer, CoordVector> map(Tuple2<Integer, 
CoordVector> rec) {
                        return rec;
                }
        }
 
-       static class DummyReducer implements GroupReduceFunction<Record, 
Record>, Serializable {
+       static class DummyReducer implements 
GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, 
CoordVector>>, Serializable {
                private static final long serialVersionUID = 1L;
 
                @Override
-               public void reduce(Iterable<Record> it, Collector<Record> out) {
-                       for (Record r : it) {
+               public void reduce(Iterable<Tuple2<Integer, CoordVector>> it, 
Collector<Tuple2<Integer, CoordVector>> out) {
+                       for (Tuple2<Integer, CoordVector> r : it) {
                                out.collect(r);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
deleted file mode 100644
index ac3659a..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class IterativeKMeansITCase extends RecordAPITestBase {
-
-       protected String dataPath;
-       protected String clusterPath;
-       protected String resultPath;
-
-       public IterativeKMeansITCase(){
-               setTaskManagerNumSlots(parallelism);
-       }
-       
-       @Override
-       protected void preSubmit() throws Exception {
-               dataPath = createTempFile("datapoints.txt", 
KMeansData.DATAPOINTS);
-               clusterPath = createTempFile("initial_centers.txt", 
KMeansData.INITIAL_CENTERS);
-               resultPath = getTempDirPath("result");
-       }
-       
-       @Override
-       protected Plan getTestJob() {
-               KMeansBroadcast kmi = new KMeansBroadcast();
-               return kmi.getPlan(String.valueOf(parallelism), dataPath, 
clusterPath, resultPath, "20");
-       }
-
-
-       @Override
-       protected void postSubmit() throws Exception {
-               List<String> resultLines = new ArrayList<String>();
-               readAllResultLines(resultLines, resultPath);
-               
-               
KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT,
 resultLines, 0.1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
deleted file mode 100644
index fcf43df..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-
-public class KMeansITCase extends RecordAPITestBase {
-
-       protected String dataPath;
-       protected String clusterPath;
-       protected String resultPath;
-
-       public KMeansITCase(){
-               setTaskManagerNumSlots(parallelism);
-       }
-
-       @Override
-       protected void preSubmit() throws Exception {
-               dataPath = createTempFile("datapoints.txt", 
KMeansData.DATAPOINTS);
-               clusterPath = createTempFile("initial_centers.txt", 
KMeansData.INITIAL_CENTERS);
-               resultPath = getTempDirPath("result");
-       }
-       
-       @Override
-       protected Plan getTestJob() {
-               KMeansBroadcast kmi = new KMeansBroadcast();
-               return kmi.getPlan(String.valueOf(parallelism), dataPath, 
clusterPath, resultPath, "20");
-       }
-
-
-       @Override
-       protected void postSubmit() throws Exception {
-               List<String> resultLines = new ArrayList<String>();
-               readAllResultLines(resultLines, resultPath);
-               
-               
KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT,
 resultLines, 0.1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
index ab8ff45..959a17a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -21,11 +21,24 @@ package org.apache.flink.test.optimizer.examples;
 import static org.junit.Assert.*;
 
 import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
@@ -34,11 +47,10 @@ import 
org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation")
 public class KMeansSingleStepTest extends CompilerTestBase {
        
        private static final String DATAPOINTS = "Data Points";
@@ -54,16 +66,15 @@ public class KMeansSingleStepTest extends CompilerTestBase {
        
        @Test
        public void testCompileKMeansSingleStepWithStats() {
-               
-               KMeansSingleStep kmi = new KMeansSingleStep();
-               Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), 
IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+
+               Plan p = getKMeansPlan();
                p.setExecutionConfig(new ExecutionConfig());
                // set the statistics
                OperatorResolver cr = getContractResolver(p);
-               FileDataSource pointsSource = cr.getNode(DATAPOINTS);
-               FileDataSource centersSource = cr.getNode(CENTERS);
-               setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
-               setSourceStatistics(centersSource, 1024*1024, 32f);
+               GenericDataSourceBase pointsSource = cr.getNode(DATAPOINTS);
+               GenericDataSourceBase centersSource = cr.getNode(CENTERS);
+               setSourceStatistics(pointsSource, 100l * 1024 * 1024 * 1024, 
32f);
+               setSourceStatistics(centersSource, 1024 * 1024, 32f);
                
                OptimizedPlan plan = compileWithStats(p);
                checkPlan(plan);
@@ -71,9 +82,8 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 
        @Test
        public void testCompileKMeansSingleStepWithOutStats() {
-               
-               KMeansSingleStep kmi = new KMeansSingleStep();
-               Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), 
IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
+
+               Plan p = getKMeansPlan();
                p.setExecutionConfig(new ExecutionConfig());
                OptimizedPlan plan = compileNoStats(p);
                checkPlan(plan);
@@ -97,7 +107,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
                assertEquals(LocalStrategy.NONE, 
mapper.getInput().getLocalStrategy());
                assertEquals(LocalStrategy.NONE, 
mapper.getBroadcastInputs().get(0).getLocalStrategy());
                
-               assertEquals(DriverStrategy.COLLECTOR_MAP, 
mapper.getDriverStrategy());
+               assertEquals(DriverStrategy.MAP, mapper.getDriverStrategy());
                
                assertNull(mapper.getInput().getLocalStrategyKeys());
                assertNull(mapper.getInput().getLocalStrategySortOrder());
@@ -127,4 +137,145 @@ public class KMeansSingleStepTest extends 
CompilerTestBase {
                assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
                assertEquals(LocalStrategy.NONE, 
sink.getInput().getLocalStrategy());
        }
+
+       public static Plan getKMeansPlan() {
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       KMeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
+               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("KMeans failed with an exception");
+               }
+               return env.getPlan();
+       }
+
+       public static void KMeans(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Point> points = env.readCsvFile(args[0])
+                               .fieldDelimiter(" ")
+                               .includeFields(true, true)
+                               .types(Double.class, Double.class)
+                               .name(DATAPOINTS)
+                               .map(new MapFunction<Tuple2<Double, Double>, 
Point>() {
+                                       @Override
+                                       public Point map(Tuple2<Double, Double> 
value) throws Exception {
+                                               return new Point(value.f0, 
value.f1);
+                                       }
+                               });
+
+               DataSet<Centroid> centroids = env.readCsvFile(args[1])
+                               .fieldDelimiter(" ")
+                               .includeFields(true, true, true)
+                               .types(Integer.class, Double.class, 
Double.class)
+                               .name(CENTERS)
+                               .map(new MapFunction<Tuple3<Integer, Double, 
Double>, Centroid>() {
+                                       @Override
+                                       public Centroid map(Tuple3<Integer, 
Double, Double> value) throws Exception {
+                                               return new Centroid(value.f0, 
value.f1, value.f2);
+                                       }
+                               });
+
+               DataSet<Tuple3<Integer, Point, Integer>> newCentroids = points
+                               .map(new 
SelectNearestCenter()).name(MAPPER_NAME).withBroadcastSet(centroids, 
"centroids");
+
+               DataSet<Tuple3<Integer, Point, Integer>> recomputeClusterCenter
+                               = newCentroids.groupBy(0).reduceGroup(new 
RecomputeClusterCenter()).name(REDUCER_NAME);
+
+               recomputeClusterCenter.project(0, 1).writeAsCsv(args[2], "\n", 
" ").name(SINK);
+
+               env.execute("KMeans Example");
+       }
+
+       public static class Point extends Tuple2<Double, Double> {
+               public Point(double x, double y) {
+                       this.f0 = x;
+                       this.f1 = y;
+               }
+
+               public Point add(Point other) {
+                       f0 += other.f0;
+                       f1 += other.f1;
+                       return this;
+               }
+
+               public Point div(long val) {
+                       f0 /= val;
+                       f1 /= val;
+                       return this;
+               }
+
+               public double euclideanDistance(Point other) {
+                       return Math.sqrt((f0 - other.f0) * (f0 - other.f0) + 
(f1 - other.f1) * (f1 - other.f1));
+               }
+
+               public double euclideanDistance(Centroid other) {
+                       return Math.sqrt((f0 - other.f1.f0) * (f0 - 
other.f1.f0) + (f1 - other.f1.f1) * (f1 - other.f1.f1));
+               }
+       }
+
+       public static class Centroid extends Tuple2<Integer, Point> {
+               public Centroid(int id, double x, double y) {
+                       this.f0 = id;
+                       this.f1 = new Point(x, y);
+               }
+
+               public Centroid(int id, Point p) {
+                       this.f0 = id;
+                       this.f1 = p;
+               }
+       }
+
+       /**
+        * Determines the closest cluster center for a data point.
+        */
+       public static final class SelectNearestCenter extends 
RichMapFunction<Point, Tuple3<Integer, Point, Integer>> {
+               private Collection<Centroid> centroids;
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       this.centroids = 
getRuntimeContext().getBroadcastVariable("centroids");
+               }
+
+               @Override
+               public Tuple3<Integer, Point, Integer> map(Point p) throws 
Exception {
+                       double minDistance = Double.MAX_VALUE;
+                       int closestCentroidId = -1;
+                       for (Centroid centroid : centroids) {
+                               double distance = p.euclideanDistance(centroid);
+                               if (distance < minDistance) {
+                                       minDistance = distance;
+                                       closestCentroidId = centroid.f0;
+                               }
+                       }
+                       return new Tuple3<>(closestCentroidId, p, 1);
+               }
+       }
+
+       @Combinable
+       public static final class RecomputeClusterCenter extends 
RichGroupReduceFunction<Tuple3<Integer, Point, Integer>, Tuple3<Integer, Point, 
Integer>> {
+               @Override
+               public void reduce(Iterable<Tuple3<Integer, Point, Integer>> 
values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception {
+                       int id = -1;
+                       double x = 0;
+                       double y = 0;
+                       int count = 0;
+                       for (Tuple3<Integer, Point, Integer> value : values) {
+                               id = value.f0;
+                               x += value.f1.f0;
+                               y += value.f1.f1;
+                               count += value.f2;
+                       }
+                       out.collect(new Tuple3<>(id, new Point(x, y), count));
+               }
+
+               @Override
+               public void combine(Iterable<Tuple3<Integer, Point, Integer>> 
values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception {
+                       reduce(values, out);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
index f4efb8a..e929913 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -22,10 +22,23 @@ import java.util.Arrays;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import 
org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -35,23 +48,25 @@ import 
org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
 /**
  * Tests TPCH Q3 (simplified) under various input conditions.
  */
-@SuppressWarnings("deprecation")
+@SuppressWarnings("serial")
 public class RelationalQueryCompilerTest extends CompilerTestBase {
        
        private static final String ORDERS = "Orders";
        private static final String LINEITEM = "LineItems";
        private static final String MAPPER_NAME = "FilterO";
        private static final String JOIN_NAME = "JoinLiO";
+       private static final String REDUCE_NAME = "AggLiO";
+       private static final String SINK = "Output";
        
        private final FieldList set0 = new FieldList(0);
-       private final FieldList set01 = new FieldList(new int[] {0,1});
+       private final FieldList set01 = new FieldList(0,1);
        private final ExecutionConfig defaultExecutionConfig = new 
ExecutionConfig();
        
        // 
------------------------------------------------------------------------
@@ -63,8 +78,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
        @Test
        public void testQueryNoStatistics() {
                try {
-                       TPCHQuery3 query = new TPCHQuery3();
-                       Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, 
IN_FILE, IN_FILE, OUT_FILE);
+                       Plan p = getTPCH3Plan();
                        p.setExecutionConfig(defaultExecutionConfig);
                        // compile
                        final OptimizedPlan plan = compileNoStats(p);
@@ -72,12 +86,12 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
                        final OptimizerPlanNodeResolver or = 
getOptimizerPlanNodeResolver(plan);
                        
                        // get the nodes from the final plan
-                       final SinkPlanNode sink = or.getNode("Output");
-                       final SingleInputPlanNode reducer = 
or.getNode("AggLio");
+                       final SinkPlanNode sink = or.getNode(SINK);
+                       final SingleInputPlanNode reducer = 
or.getNode(REDUCE_NAME);
                        final SingleInputPlanNode combiner = 
reducer.getPredecessor() instanceof SingleInputPlanNode ?
                                        (SingleInputPlanNode) 
reducer.getPredecessor() : null;
-                       final DualInputPlanNode join = or.getNode("JoinLiO");
-                       final SingleInputPlanNode filteringMapper = 
or.getNode("FilterO");
+                       final DualInputPlanNode join = or.getNode(JOIN_NAME);
+                       final SingleInputPlanNode filteringMapper = 
or.getNode(MAPPER_NAME);
                        
                        // verify the optimizer choices
                        checkStandardStrategies(filteringMapper, join, 
combiner, reducer, sink);
@@ -95,7 +109,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         */
        @Test
        public void testQueryAnyValidPlan() {
-               testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, 
true, true, true, false, true);
+               testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, 
0.05f, true, true, true, false, true);
        }
        
        /**
@@ -103,7 +117,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         */
        @Test
        public void testQueryWithSizeZeroInputs() {
-               testQueryGeneric(0, 0, 0.5f, true, true, true, false, true);
+               testQueryGeneric(0, 0, 0.1f, 0.5f, true, true, true, false, 
true);
        }
        
        /**
@@ -111,7 +125,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         */
        @Test
        public void testQueryWithStatsForBroadcastHash() {
-               testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 
0.05f, true, false, true, false, false);
+               testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 
0.01f, 0.05f, true, false, true, false, false);
        }
        
        /**
@@ -119,7 +133,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         */
        @Test
        public void testQueryWithStatsForRepartitionAny() {
-               testQueryGeneric(100l*1024*1024*1024*1024, 
100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true);
+               testQueryGeneric(100l*1024*1024*1024*1024, 
100l*1024*1024*1024*1024, 0.1f, 0.5f, false, true, true, true, true);
        }
        
        /**
@@ -128,34 +142,23 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         */
        @Test
        public void testQueryWithStatsForRepartitionMerge() {
-               TPCHQuery3 query = new TPCHQuery3();
-               Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, 
IN_FILE, OUT_FILE);
+               Plan p = getTPCH3Plan();
                p.setExecutionConfig(defaultExecutionConfig);
                // set compiler hints
                OperatorResolver cr = getContractResolver(p);
-               JoinOperator match = cr.getNode("JoinLiO");
+               DualInputOperator<?,?,?,?> match = cr.getNode(JOIN_NAME);
                match.getCompilerHints().setFilterFactor(100f);
                
-               testQueryGeneric(100l*1024*1024*1024*1024, 
100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true);
+               testQueryGeneric(100l*1024*1024*1024*1024, 
100l*1024*1024*1024*1024, 0.01f, 100f, false, true, false, false, true);
        }
        
        // 
------------------------------------------------------------------------
-       
-       private void testQueryGeneric(long orderSize, long lineItemSize, 
-                       float ordersFilterFactor, 
-                       boolean broadcastOkay, boolean partitionedOkay,
-                       boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, 
boolean mergeJoinOkay)
-       {
-               testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, 
ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, 
hashJoinSecondOkay, mergeJoinOkay);
-       }
-       
        private void testQueryGeneric(long orderSize, long lineItemSize, 
                        float ordersFilterFactor, float joinFilterFactor,
                        boolean broadcastOkay, boolean partitionedOkay,
                        boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, 
boolean mergeJoinOkay)
        {
-               TPCHQuery3 query = new TPCHQuery3();
-               Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, 
IN_FILE, OUT_FILE);
+               Plan p = getTPCH3Plan();
                p.setExecutionConfig(defaultExecutionConfig);
                testQueryGeneric(p, orderSize, lineItemSize, 
ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, 
hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
        }
@@ -168,10 +171,10 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
                try {
                        // set statistics
                        OperatorResolver cr = getContractResolver(p);
-                       FileDataSource ordersSource = cr.getNode(ORDERS);
-                       FileDataSource lineItemSource = cr.getNode(LINEITEM);
-                       MapOperator mapper = cr.getNode(MAPPER_NAME);
-                       JoinOperator joiner = cr.getNode(JOIN_NAME);
+                       GenericDataSourceBase<?,?> ordersSource = 
cr.getNode(ORDERS);
+                       GenericDataSourceBase<?,?> lineItemSource = 
cr.getNode(LINEITEM);
+                       SingleInputOperator<?,?,?> mapper = 
cr.getNode(MAPPER_NAME);
+                       DualInputOperator<?,?,?,?> joiner = 
cr.getNode(JOIN_NAME);
                        setSourceStatistics(ordersSource, orderSize, 100f);
                        setSourceStatistics(lineItemSource, lineitemSize, 140f);
                        mapper.getCompilerHints().setAvgOutputRecordSize(16f);
@@ -183,12 +186,12 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
                        final OptimizerPlanNodeResolver or = 
getOptimizerPlanNodeResolver(plan);
                        
                        // get the nodes from the final plan
-                       final SinkPlanNode sink = or.getNode("Output");
-                       final SingleInputPlanNode reducer = 
or.getNode("AggLio");
+                       final SinkPlanNode sink = or.getNode(SINK);
+                       final SingleInputPlanNode reducer = 
or.getNode(REDUCE_NAME);
                        final SingleInputPlanNode combiner = 
reducer.getPredecessor() instanceof SingleInputPlanNode ?
                                        (SingleInputPlanNode) 
reducer.getPredecessor() : null;
-                       final DualInputPlanNode join = or.getNode("JoinLiO");
-                       final SingleInputPlanNode filteringMapper = 
or.getNode("FilterO");
+                       final DualInputPlanNode join = or.getNode(JOIN_NAME);
+                       final SingleInputPlanNode filteringMapper = 
or.getNode(MAPPER_NAME);
                        
                        checkStandardStrategies(filteringMapper, join, 
combiner, reducer, sink);
                        
@@ -230,7 +233,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
        // 
------------------------------------------------------------------------
        //  Checks for special conditions
        // 
------------------------------------------------------------------------
-       
+
        private void checkStandardStrategies(SingleInputPlanNode map, 
DualInputPlanNode join, SingleInputPlanNode combiner,
                        SingleInputPlanNode reducer, SinkPlanNode sink)
        {
@@ -239,7 +242,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
                Assert.assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
                
                // check the driver strategies that are always fix
-               Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, 
map.getDriverStrategy());
+               Assert.assertEquals(DriverStrategy.FLAT_MAP, 
map.getDriverStrategy());
                Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reducer.getDriverStrategy());
                Assert.assertEquals(DriverStrategy.NONE, 
sink.getDriverStrategy());
                if (combiner != null) {
@@ -348,4 +351,73 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
                        return false;
                }
        }
+
+       public static Plan getTPCH3Plan() {
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       TCPH3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, 
IN_FILE, OUT_FILE});
+               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("TCPH3 failed with an exception");
+               }
+               return env.getPlan();
+       }
+
+       public static void TCPH3(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(Integer.parseInt(args[0]));
+
+               //order id, order status, order data, order prio, ship prio
+               DataSet<Tuple5<Long, String, String, String, Integer>> orders
+                               = env.readCsvFile(args[1])
+                               .fieldDelimiter("|").lineDelimiter("\n")
+                               .includeFields("101011001").types(Long.class, 
String.class, String.class, String.class, Integer.class)
+                               .name(ORDERS);
+
+               //order id, extended price
+               DataSet<Tuple2<Long, Double>> lineItems
+                               = env.readCsvFile(args[2])
+                               .fieldDelimiter("|").lineDelimiter("\n")
+                               .includeFields("100001").types(Long.class, 
Double.class)
+                               .name(LINEITEM);
+
+               DataSet<Tuple2<Long, Integer>> filterO = orders.flatMap(new 
FilterO()).name(MAPPER_NAME);
+
+               DataSet<Tuple3<Long, Integer, Double>> joinLiO = 
filterO.join(lineItems).where(0).equalTo(0).with(new JoinLiO()).name(JOIN_NAME);
+
+               DataSet<Tuple3<Long, Integer, Double>> aggLiO = 
joinLiO.groupBy(0, 1).reduceGroup(new AggLiO()).name(REDUCE_NAME);
+
+               aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
+
+               env.execute();
+       }
+
+       @ForwardedFields("f0; f4->f1")
+       public static class FilterO implements FlatMapFunction<Tuple5<Long, 
String, String, String, Integer>, Tuple2<Long, Integer>> {
+               @Override
+               public void flatMap(Tuple5<Long, String, String, String, 
Integer> value, Collector<Tuple2<Long, Integer>> out) throws Exception {
+                       // not going to be executed
+               }
+       }
+
+       @ForwardedFieldsFirst("f0; f1")
+       public static class JoinLiO implements FlatJoinFunction<Tuple2<Long, 
Integer>, Tuple2<Long, Double>, Tuple3<Long, Integer, Double>> {
+               @Override
+               public void join(Tuple2<Long, Integer> first, Tuple2<Long, 
Double> second, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
+                       // not going to be executed
+               }
+       }
+
+       @ForwardedFields("f0; f1")
+       @Combinable
+       public static class AggLiO extends RichGroupReduceFunction<Tuple3<Long, 
Integer, Double>, Tuple3<Long, Integer, Double>> {
+               @Override
+               public void reduce(Iterable<Tuple3<Long, Integer, Double>> 
values, Collector<Tuple3<Long, Integer, Double>> out) throws Exception {
+                       // not going to be executed
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
index 99402a5..e134c7a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -20,7 +20,19 @@ package org.apache.flink.test.optimizer.iterations;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -33,13 +45,14 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup;
+import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
 /**
  *
  */
+@SuppressWarnings("serial")
 public class ConnectedComponentsCoGroupTest extends CompilerTestBase {
        
        private static final String VERTEX_SOURCE = "Vertices";
@@ -59,10 +72,7 @@ public class ConnectedComponentsCoGroupTest extends 
CompilerTestBase {
        
        @Test
        public void testWorksetConnectedComponents() {
-               ConnectedComponentsWithCoGroup cc = new 
ConnectedComponentsWithCoGroup();
-
-               Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM),
-                               IN_FILE, IN_FILE, OUT_FILE, 
String.valueOf(100));
+               Plan plan = getConnectedComponentsCoGroupPlan();
                plan.setExecutionConfig(new ExecutionConfig());
                OptimizedPlan optPlan = compileNoStats(plan);
                OptimizerPlanNodeResolver or = 
getOptimizerPlanNodeResolver(optPlan);
@@ -134,4 +144,68 @@ public class ConnectedComponentsCoGroupTest extends 
CompilerTestBase {
                JobGraphGenerator jgg = new JobGraphGenerator();
                jgg.compileJobGraph(optPlan);
        }
+
+       public static Plan getConnectedComponentsCoGroupPlan() {
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       ConnectedComponentsWithCoGroup(new 
String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
+               } catch (ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("ConnectedComponentsWithCoGroup failed with 
an exception");
+               }
+               return env.getPlan();
+       }
+
+       public static void ConnectedComponentsWithCoGroup(String[] args) throws 
Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(Integer.parseInt(args[0]));
+
+               DataSet<Tuple1<Long>> initialVertices = 
env.readCsvFile(args[1]).types(Long.class).name(VERTEX_SOURCE);
+
+               DataSet<Tuple2<Long, Long>> edges = 
env.readCsvFile(args[2]).types(Long.class, Long.class).name(EDGES_SOURCE);
+
+               DataSet<Tuple2<Long, Long>> verticesWithId = 
initialVertices.flatMap(new DummyMapFunction());
+
+               DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration
+                               = verticesWithId.iterateDelta(verticesWithId, 
Integer.parseInt(args[4]), 0).name(ITERATION_NAME);
+
+               DataSet<Tuple2<Long, Long>> joinWithNeighbors = 
iteration.getWorkset().join(edges)
+                               .where(0).equalTo(0)
+                               .with(new 
DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH);
+
+               DataSet<Tuple2<Long, Long>> minAndUpdate = 
joinWithNeighbors.coGroup(iteration.getSolutionSet())
+                               .where(0).equalTo(0)
+                               .with(new 
DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE);
+
+               iteration.closeWith(minAndUpdate, 
minAndUpdate).writeAsCsv(args[3]).name(SINK);
+
+               env.execute();
+       }
+
+       public static class DummyMapFunction implements 
FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
+               @Override
+               public void flatMap(Tuple1<Long> value, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+                       // won't be executed
+               }
+       }
+
+       public static class DummyJoinFunction implements 
FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+               @Override
+               public void join(Tuple2<Long, Long> first, Tuple2<Long, Long> 
second, Collector<Tuple2<Long, Long>> out) throws Exception {
+                       // won't be executed
+               }
+       }
+
+       @ForwardedFieldsFirst("f0->f0")
+       @ForwardedFieldsSecond("f0->f0")
+       public static class DummyCoGroupFunction implements 
CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+               @Override
+               public void coGroup(Iterable<Tuple2<Long, Long>> first, 
Iterable<Tuple2<Long, Long>> second, Collector<Tuple2<Long, Long>> out) throws 
Exception {
+                       // won't be executed
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
deleted file mode 100644
index 3785270..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.optimizer.iterations;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.optimizer.util.OperatorResolver;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class IterativeKMeansTest extends CompilerTestBase {
-       
-       private static final String DATAPOINTS = "Data Points";
-       private static final String CENTERS = "Centers";
-       
-       private static final String MAPPER_NAME = "Find Nearest Centers";
-       private static final String REDUCER_NAME = "Recompute Center Positions";
-       
-       private static final String ITERATION_NAME = "k-means loop";
-       
-       private static final String SINK = "New Center Positions";
-       
-       private final FieldList set0 = new FieldList(0);
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  K-Means (Bulk Iteration)
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Test
-       public void testCompileKMeansSingleStepWithStats() {
-               
-               KMeansBroadcast kmi = new KMeansBroadcast();
-               Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), 
IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-               p.setExecutionConfig(new ExecutionConfig());
-               // set the statistics
-               OperatorResolver cr = getContractResolver(p);
-               FileDataSource pointsSource = cr.getNode(DATAPOINTS);
-               FileDataSource centersSource = cr.getNode(CENTERS);
-               setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f);
-               setSourceStatistics(centersSource, 1024*1024, 32f);
-               
-               OptimizedPlan plan = compileWithStats(p);
-               checkPlan(plan);
-               
-               new JobGraphGenerator().compileJobGraph(plan);
-       }
-
-       @Test
-       public void testCompileKMeansSingleStepWithOutStats() {
-               
-               KMeansBroadcast kmi = new KMeansBroadcast();
-               Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), 
IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20));
-               p.setExecutionConfig(new ExecutionConfig());
-               OptimizedPlan plan = compileNoStats(p);
-               checkPlan(plan);
-               
-               new JobGraphGenerator().compileJobGraph(plan);
-       }
-       
-       private void checkPlan(OptimizedPlan plan) {
-               
-               OptimizerPlanNodeResolver or = 
getOptimizerPlanNodeResolver(plan);
-               
-               final SinkPlanNode sink = or.getNode(SINK);
-               final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME);
-               final SingleInputPlanNode combiner = (SingleInputPlanNode) 
reducer.getPredecessor();
-               final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME);
-               
-               final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME);
-               
-               // -------------------- outside the loop -----------------------
-               
-               // check the sink
-               assertEquals(ShipStrategyType.FORWARD, 
sink.getInput().getShipStrategy());
-               assertEquals(LocalStrategy.NONE, 
sink.getInput().getLocalStrategy());
-               
-               // check the iteration
-               assertEquals(ShipStrategyType.FORWARD, 
iter.getInput().getShipStrategy());
-               assertEquals(LocalStrategy.NONE, 
iter.getInput().getLocalStrategy());
-               
-               
-               // -------------------- inside the loop -----------------------
-               
-               // check the mapper
-               assertEquals(1, mapper.getBroadcastInputs().size());
-               assertEquals(ShipStrategyType.FORWARD, 
mapper.getInput().getShipStrategy());
-               assertEquals(ShipStrategyType.BROADCAST, 
mapper.getBroadcastInputs().get(0).getShipStrategy());
-               assertFalse(mapper.getInput().isOnDynamicPath());
-               
assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath());
-               assertTrue(mapper.getInput().getTempMode().isCached());
-               
-               assertEquals(LocalStrategy.NONE, 
mapper.getInput().getLocalStrategy());
-               assertEquals(LocalStrategy.NONE, 
mapper.getBroadcastInputs().get(0).getLocalStrategy());
-               
-               assertEquals(DriverStrategy.COLLECTOR_MAP, 
mapper.getDriverStrategy());
-               
-               assertNull(mapper.getInput().getLocalStrategyKeys());
-               assertNull(mapper.getInput().getLocalStrategySortOrder());
-               
assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys());
-               
assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder());
-               
-               // check the combiner
-               Assert.assertNotNull(combiner);
-               assertEquals(ShipStrategyType.FORWARD, 
combiner.getInput().getShipStrategy());
-               assertTrue(combiner.getInput().isOnDynamicPath());
-               
-               assertEquals(LocalStrategy.NONE, 
combiner.getInput().getLocalStrategy());
-               assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, 
combiner.getDriverStrategy());
-               assertNull(combiner.getInput().getLocalStrategyKeys());
-               assertNull(combiner.getInput().getLocalStrategySortOrder());
-               assertEquals(set0, combiner.getKeys(0));
-               assertEquals(set0, combiner.getKeys(1));
-               
-               // check the reducer
-               assertEquals(ShipStrategyType.PARTITION_HASH, 
reducer.getInput().getShipStrategy());
-               assertTrue(reducer.getInput().isOnDynamicPath());
-               assertEquals(LocalStrategy.COMBININGSORT, 
reducer.getInput().getLocalStrategy());
-               assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, 
reducer.getDriverStrategy());
-               assertEquals(set0, reducer.getKeys(0));
-               assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
-               
assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), 
reducer.getSortOrders(0)));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 24d9416..d186cbb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -25,13 +25,12 @@ import 
org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
+import org.apache.flink.examples.java.graph.PageRankBasic;
+import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
+import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
@@ -45,17 +44,34 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
        
        @Test
        public void dumpWordCount() {
-               dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, 
IN_FILE, OUT_FILE));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       WordCount.main(new String[] {IN_FILE, OUT_FILE});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("WordCount failed with an exception");
+               }
+               dump(env.getPlan());
        }
        
        @Test
        public void dumpTPCH3() {
-               dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, 
IN_FILE, IN_FILE, OUT_FILE));
-       }
-       
-       @Test
-       public void dumpKMeans() {
-               dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, 
IN_FILE, IN_FILE, OUT_FILE));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, 
OUT_FILE, "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("TPCH3 failed with an exception");
+               }
+               dump(env.getPlan());
        }
        
        @Test
@@ -64,7 +80,6 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
                PreviewPlanEnvironment env = new PreviewPlanEnvironment();
                env.setAsContext();
                try {
-                       // <points path> <centers path> <result path> <num 
iterations
                        KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, 
"123"});
                } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
                        // all good.
@@ -77,17 +92,50 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
        
        @Test
        public void dumpWebLogAnalysis() {
-               dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, 
IN_FILE, IN_FILE, IN_FILE, OUT_FILE));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, 
OUT_FILE, "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("WebLogAnalysis failed with an exception");
+               }
+               dump(env.getPlan());
        }
 
        @Test
        public void dumpBulkIterationKMeans() {
-               dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, 
IN_FILE, OUT_FILE));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       ConnectedComponents.main(new String[] {IN_FILE, 
IN_FILE, OUT_FILE, "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("ConnectedComponents failed with an 
exception");
+               }
+               dump(env.getPlan());
        }
        
        @Test
-       public void dumpDeltaPageRank() {
-               dump(new 
DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, 
IN_FILE, IN_FILE, OUT_FILE, "10"));
+       public void dumpPageRank() {
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       PageRankBasic.main(new String[] {IN_FILE, IN_FILE, 
OUT_FILE, "10", "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("PagaRank failed with an exception");
+               }
+               dump(env.getPlan());
        }
        
        private void dump(Plan p) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index 49fe6d8..95a06c3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -21,16 +21,17 @@ package org.apache.flink.test.optimizer.jsonplan;
 import java.util.List;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
+import org.apache.flink.examples.java.clustering.KMeans;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
+import org.apache.flink.examples.java.graph.PageRankBasic;
+import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast;
-import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
-import org.apache.flink.test.recordJobs.relational.TPCHQuery3;
-import org.apache.flink.test.recordJobs.relational.WebLogAnalysis;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
@@ -40,51 +41,102 @@ import org.junit.Test;
 /*
  * The tests in this class simply invokes the JSON dump code for the original 
plan.
  */
-public class PreviewPlanDumpTest {
+public class PreviewPlanDumpTest extends CompilerTestBase {
        
-       protected static final String IN_FILE = OperatingSystem.isWindows() ?  
"file:/c:/test/file" : "file:///test/file";
-       
-       protected static final String OUT_FILE = OperatingSystem.isWindows() ?  
"file:/c:/test/output" : "file:///test/output";
-       
-       protected static final String[] NO_ARGS = new String[0];
-
        @Test
        public void dumpWordCount() {
-               dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE));
-               
-               // The web interface passes empty string-args to compute the 
preview of the
-               // job, so we should test this situation too
-               dump(new WordCount().getPlan(NO_ARGS));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       WordCount.main(new String[] {IN_FILE, OUT_FILE});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("WordCount failed with an exception");
+               }
+               dump(env.getPlan());
        }
        
        @Test
        public void dumpTPCH3() {
-               dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE));
-               dump(new TPCHQuery3().getPlan(NO_ARGS));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, 
OUT_FILE, "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("TPCH3 failed with an exception");
+               }
+               dump(env.getPlan());
        }
        
        @Test
-       public void dumpKMeans() {
-               dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, 
OUT_FILE));
-               dump(new KMeansSingleStep().getPlan(NO_ARGS));
+       public void dumpIterativeKMeans() {
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, 
"123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("KMeans failed with an exception");
+               }
+               dump(env.getPlan());
        }
        
        @Test
        public void dumpWebLogAnalysis() {
-               dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, 
IN_FILE, OUT_FILE));
-               dump(new WebLogAnalysis().getPlan(NO_ARGS));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       
org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] 
{IN_FILE, IN_FILE, OUT_FILE, "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("WebLogAnalysis failed with an exception");
+               }
+               dump(env.getPlan());
        }
-       
+
        @Test
        public void dumpBulkIterationKMeans() {
-               dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE));
-               dump(new KMeansBroadcast().getPlan(NO_ARGS));
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       ConnectedComponents.main(new String[] {IN_FILE, 
IN_FILE, OUT_FILE, "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("ConnectedComponents failed with an 
exception");
+               }
+               dump(env.getPlan());
        }
        
        @Test
-       public void dumpDeltaPageRank() {
-               dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, 
IN_FILE, IN_FILE, OUT_FILE, "10"));
-               dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS));
+       public void dumpPageRank() {
+               // prepare the test environment
+               PreviewPlanEnvironment env = new PreviewPlanEnvironment();
+               env.setAsContext();
+               try {
+                       PageRankBasic.main(new String[] {IN_FILE, IN_FILE, 
OUT_FILE, "10", "123"});
+               } catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
+                       // all good.
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("PagaRank failed with an exception");
+               }
+               dump(env.getPlan());
        }
        
        private void dump(Plan p) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 310ded8..ec2dbb7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -29,8 +29,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.junit.After;
 import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.test.util.JavaProgramTestBase;
 
 @Ignore
 public class NetworkStackThroughputITCase {
@@ -62,8 +62,8 @@ public class NetworkStackThroughputITCase {
 
        // 
------------------------------------------------------------------------
 
-       // wrapper to reuse RecordAPITestBase code in runs via main()
-       private static class TestBaseWrapper extends RecordAPITestBase {
+       // wrapper to reuse JavaProgramTestBase code in runs via main()
+       private static class TestBaseWrapper extends JavaProgramTestBase {
 
                private int dataVolumeGb;
                private boolean useForwarder;
@@ -90,7 +90,6 @@ public class NetworkStackThroughputITCase {
                        setTaskManagerNumSlots(numSlots);
                }
 
-               @Override
                protected JobGraph getJobGraph() throws Exception {
                        return createJobGraph(dataVolumeGb, useForwarder, 
isSlowSender, isSlowReceiver, parallelism);
                }
@@ -138,19 +137,19 @@ public class NetworkStackThroughputITCase {
                        return jobGraph;
                }
 
-               @After
-               public void calculateThroughput() {
-                       if (getJobExecutionResult() != null) {
-                               int dataVolumeGb = 
this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
 
-                               long dataVolumeMbit = dataVolumeGb * 8192;
-                               long runtimeSecs = 
getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS);
+               @Override
+               protected void testProgram() throws Exception {
+                       JobExecutionResult jer = 
executor.submitJobAndWait(getJobGraph(), false);
+                       int dataVolumeGb = 
this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1);
+
+                       long dataVolumeMbit = dataVolumeGb * 8192;
+                       long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);
 
-                               int mbitPerSecond = (int) (((double) 
dataVolumeMbit) / runtimeSecs);
+                       int mbitPerSecond = (int) (((double) dataVolumeMbit) / 
runtimeSecs);
 
-                               LOG.info(String.format("Test finished with 
throughput of %d MBit/s (runtime [secs]: %d, " +
-                                               "data volume [gb/mbits]: 
%d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
-                       }
+                       LOG.info(String.format("Test finished with throughput 
of %d MBit/s (runtime [secs]: %d, " +
+                                       "data volume [gb/mbits]: %d/%d)", 
mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
                }
        }
 
@@ -289,8 +288,7 @@ public class NetworkStackThroughputITCase {
                        TestBaseWrapper test = new TestBaseWrapper(config);
 
                        System.out.println(Arrays.toString(p));
-                       test.testJob();
-                       test.calculateThroughput();
+                       test.testProgram();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
deleted file mode 100644
index 0cff9b6..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.testPrograms.util.tests;
-
-import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat;
-import org.apache.flink.test.recordJobs.util.Tuple;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class IntTupleDataInFormatTest
-{
-       @Test
-       public void testReadLineKeyValuePairOfIntValueTupleByteArray() {
-               
-               String[] testTuples = {
-                       "1|attribute1|attribute2|3|attribute4|5|",
-                       "2|3|",
-                       "3|attribute1|attribute2|",
-                       "-1|attr1|attr2|",
-                       "-2|attribute1|attribute2|",
-                       Integer.MAX_VALUE+"|attr1|attr2|attr3|attr4|",
-                       Integer.MIN_VALUE+"|attr1|attr2|attr3|attr4|"
-               };
-               
-               int[] expectedKeys = {
-                       1,2,3,-1,-2,Integer.MAX_VALUE,Integer.MIN_VALUE
-               };
-               
-               int[] expectedAttrCnt = {6,2,3,3,3,5,5};
-
-               IntTupleDataInFormat inFormat = new IntTupleDataInFormat();
-               Record rec = new Record();      
-               
-               for(int i = 0; i < testTuples.length; i++) {
-                       
-                       byte[] tupleBytes = testTuples[i].getBytes();
-                       
-                       inFormat.readRecord(rec, tupleBytes, 0, 
tupleBytes.length);
-                       
-                       Assert.assertTrue("Expected Key: "+expectedKeys[i]+" != 
Returned Key: "+rec.getField(0, IntValue.class), rec.getField(0, 
IntValue.class).equals(new IntValue(expectedKeys[i])));
-                       Assert.assertTrue("Expected Attr Cnt: 
"+expectedAttrCnt[i]+" != Returned Attr Cnt: "+rec.getField(1, Tuple.class), 
rec.getField(1, Tuple.class).getNumberOfColumns() == expectedAttrCnt[i]);
-               }
-       }
-}

Reply via email to