http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
index 5a0a6fc..bf2bbae 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -77,7 +77,7 @@ public class IterativeKMeansTest extends CompilerTestBase {
                OptimizedPlan plan = compileWithStats(p);
                checkPlan(plan);
                
-               new NepheleJobGraphGenerator().compileJobGraph(plan);
+               new JobGraphGenerator().compileJobGraph(plan);
        }
 
        @Test
@@ -89,7 +89,7 @@ public class IterativeKMeansTest extends CompilerTestBase {
                OptimizedPlan plan = compileNoStats(p);
                checkPlan(plan);
                
-               new NepheleJobGraphGenerator().compileJobGraph(plan);
+               new JobGraphGenerator().compileJobGraph(plan);
        }
        
        private void checkPlan(OptimizedPlan plan) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
index cc8f788..e6a1e69 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 
 @SuppressWarnings("serial")
 public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase 
{
@@ -79,7 +79,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends 
CompilerTestBase {
                        assertEquals(SolutionSetPlanNode.class, 
join1.getInput1().getSource().getClass());
                        assertEquals(SolutionSetPlanNode.class, 
join2.getInput2().getSource().getClass());
                        
-                       new NepheleJobGraphGenerator().compileJobGraph(optPlan);
+                       new JobGraphGenerator().compileJobGraph(optPlan);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
index 731e344..f05bd25 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
 import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -68,7 +68,7 @@ public class PageRankCompilerTest extends CompilerTestBase{
                        IterativeDataSet<Tuple2<Long, Double>> iteration = 
pagesWithRanks.iterate(10);
                        
                        Configuration cfg = new Configuration();
-                       cfg.setString(PactCompiler.HINT_LOCAL_STRATEGY, 
PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
+                       cfg.setString(Optimizer.HINT_LOCAL_STRATEGY, 
Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
                        
                        DataSet<Tuple2<Long, Double>> newRanks = iteration
                                        // join pages with outgoing edges and 
distribute rank

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
index 1cf92e6..b348333 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.compiler.plandump;
 import java.util.List;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas;
@@ -89,7 +89,7 @@ public class PreviewPlanDumpTest {
        
        private void dump(Plan p) {
                try {
-                       List<DataSinkNode> sinks = 
PactCompiler.createPreOptimizedPlan(p);
+                       List<DataSinkNode> sinks = 
Optimizer.createPreOptimizedPlan(p);
                        PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();
                        String json = dumper.getPactPlanAsJSON(sinks);
                        JsonParser parser = new 
JsonFactory().createJsonParser(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
index b88eb4e..41b84e6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java
@@ -27,11 +27,6 @@ public class WordCountITCase extends JavaProgramTestBase {
        protected String textPath;
        protected String resultPath;
 
-       public WordCountITCase(){
-               setDegreeOfParallelism(4);
-               setNumTaskManagers(2);
-               setTaskManagerNumSlots(2);
-       }
 
        @Override
        protected void preSubmit() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
index a5c2bb4..9021c6a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java
@@ -24,9 +24,9 @@ import 
org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
@@ -88,11 +88,11 @@ public class TaskFailureITCase extends FailingTestBase {
                plan.setDefaultParallelism(DOP);
 
                // optimize and compile plan 
-               PactCompiler pc = new PactCompiler(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics());
                OptimizedPlan op = pc.compile(plan);
                
                // return job graph of failing job
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);
        }
 
@@ -118,11 +118,11 @@ public class TaskFailureITCase extends FailingTestBase {
                plan.setDefaultParallelism(4);
 
                // optimize and compile plan
-               PactCompiler pc = new PactCompiler(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics());
                OptimizedPlan op = pc.compile(plan);
 
                // return job graph of working job
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index 037610e..a0e3468 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -39,12 +39,6 @@ public class WordCountMapredITCase extends 
JavaProgramTestBase {
        protected String textPath;
        protected String resultPath;
 
-       public WordCountMapredITCase(){
-//             setDegreeOfParallelism(4);
-//             setNumTaskManagers(2);
-//             setTaskManagerNumSlots(2);
-       }
-
        @Override
        protected void preSubmit() throws Exception {
                textPath = createTempFile("text.txt", WordCountData.TEXT);
@@ -59,7 +53,6 @@ public class WordCountMapredITCase extends 
JavaProgramTestBase {
        @Override
        protected void testProgram() throws Exception {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-//             env.setDegreeOfParallelism(1);
 
 
                DataSet<Tuple2<LongWritable, Text>> input = 
env.readHadoopFile(new TextInputFormat(),

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index 3bdaa22..fee49bf 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -39,12 +39,6 @@ public class WordCountMapreduceITCase extends 
JavaProgramTestBase {
        protected String textPath;
        protected String resultPath;
 
-       public WordCountMapreduceITCase(){
-//             setDegreeOfParallelism(4);
-//             setNumTaskManagers(2);
-//             setTaskManagerNumSlots(2);
-       }
-
        @Override
        protected void preSubmit() throws Exception {
                textPath = createTempFile("text.txt", WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index 79d49aa..e5f91b4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import 
org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested;
@@ -377,7 +377,7 @@ public class GroupReduceITCase extends 
MultipleProgramsTestBase {
                                .map(new IdentityMapper<Tuple3<Integer, Long, 
String>>()).setParallelism(4);
 
                Configuration cfg = new Configuration();
-               cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION);
+               cfg.setString(Optimizer.HINT_SHIP_STRATEGY, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION);
                DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new 
Tuple3AllGroupReduceWithCombine())
                                .withParameters(cfg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
index d92897d..220611d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.api.java.record.operators.CoGroupOperator;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.RecordAPITestBase;
 import org.apache.flink.types.IntValue;
@@ -156,9 +156,9 @@ public class CoGroupITCase extends RecordAPITestBase {
                CoGroupOperator testCoGrouper = CoGroupOperator.builder(new 
TestCoGrouper(), StringValue.class, 0, 0)
                        .build();
                
testCoGrouper.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks",
 1));
-               
testCoGrouper.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+               
testCoGrouper.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
                                config.getString("CoGroupTest#LocalStrategy", 
""));
-               
testCoGrouper.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+               
testCoGrouper.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
                                config.getString("CoGroupTest#ShipStrategy", 
""));
 
                FileDataSink output = new FileDataSink(new CoGroupOutFormat(), 
resultPath);
@@ -181,9 +181,9 @@ public class CoGroupITCase extends RecordAPITestBase {
 
                LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 
-               String[] localStrategies = { 
PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
+               String[] localStrategies = { 
Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE };
 
-               String[] shipStrategies = { 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH };
+               String[] shipStrategies = { 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
 
                for (String localStrategy : localStrategies) {
                        for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
index f72d146..f6b4127 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.api.java.record.operators.CrossOperator;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
@@ -132,20 +132,20 @@ public class CrossITCase extends RecordAPITestBase {
 
                CrossOperator testCross = CrossOperator.builder(new 
TestCross()).build();
                
testCross.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1));
-               
testCross.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+               
testCross.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
                                config.getString("CrossTest#LocalStrategy", 
""));
                if (config.getString("CrossTest#ShipStrategy", 
"").equals("BROADCAST_FIRST")) {
-                       
testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
-                       
testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
+                       
testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
+                       
testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_FORWARD);
                } else if (config.getString("CrossTest#ShipStrategy", 
"").equals("BROADCAST_SECOND")) {
-                       
testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
-                       
testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
+                       
testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
+                       
testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_FORWARD);
                } else {
-                       
testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+                       
testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
                                        
config.getString("CrossTest#ShipStrategy", ""));
                }
 
@@ -170,10 +170,10 @@ public class CrossITCase extends RecordAPITestBase {
 
                LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 
-               String[] localStrategies = { 
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST,
-                               
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND,
-                               
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST,
-                               
PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND };
+               String[] localStrategies = { 
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST,
+                               
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND,
+                               
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST,
+                               
Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND };
 
                String[] shipStrategies = { "BROADCAST_FIRST", 
"BROADCAST_SECOND"
                // PactCompiler.HINT_SHIP_STRATEGY_BROADCAST

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
index 6906d36..02a6e38 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat;
@@ -122,20 +122,20 @@ public class JoinITCase extends RecordAPITestBase {
                JoinOperator testMatcher = JoinOperator.builder(new 
TestMatcher(), StringValue.class, 0, 0)
                        .build();
                
testMatcher.setDegreeOfParallelism(config.getInteger("MatchTest#NoSubtasks", 
1));
-               
testMatcher.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+               
testMatcher.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
                                config.getString("MatchTest#LocalStrategy", 
""));
                if (config.getString("MatchTest#ShipStrategy", 
"").equals("BROADCAST_FIRST")) {
-                       
testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
-                       
testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
+                       
testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
+                       
testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_FORWARD);
                } else if (config.getString("MatchTest#ShipStrategy", 
"").equals("BROADCAST_SECOND")) {
-                       
testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_FORWARD);
-                       
testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT,
-                                       
PactCompiler.HINT_SHIP_STRATEGY_BROADCAST);
+                       
testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_FORWARD);
+                       
testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT,
+                                       Optimizer.HINT_SHIP_STRATEGY_BROADCAST);
                } else {
-                       
testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+                       
testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
                                        
config.getString("MatchTest#ShipStrategy", ""));
                }
 
@@ -160,10 +160,10 @@ public class JoinITCase extends RecordAPITestBase {
 
                LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 
-               String[] localStrategies = { 
PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE,
-                               
PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, 
PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND };
+               String[] localStrategies = { 
Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE,
+                               Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, 
Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND };
 
-               String[] shipStrategies = { 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST", 
"BROADCAST_SECOND"};
+               String[] shipStrategies = { 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST", 
"BROADCAST_SECOND"};
 
                for (String localStrategy : localStrategies) {
                        for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
index 5f5f33d..3c8d372 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java
@@ -25,9 +25,9 @@ import 
org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
@@ -119,9 +119,9 @@ public class ReduceITCase extends RecordAPITestBase {
                ReduceOperator testReducer = ReduceOperator.builder(new 
TestReducer(), StringValue.class, 0)
                        .build();
                
testReducer.setDegreeOfParallelism(config.getInteger("ReduceTest#NoSubtasks", 
1));
-               
testReducer.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY,
+               
testReducer.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY,
                                config.getString("ReduceTest#LocalStrategy", 
""));
-               
testReducer.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY,
+               
testReducer.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY,
                                config.getString("ReduceTest#ShipStrategy", 
""));
 
                FileDataSink output = new FileDataSink(
@@ -133,10 +133,10 @@ public class ReduceITCase extends RecordAPITestBase {
 
                Plan plan = new Plan(output);
 
-               PactCompiler pc = new PactCompiler(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics());
                OptimizedPlan op = pc.compile(plan);
 
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);
 
        }
@@ -151,8 +151,8 @@ public class ReduceITCase extends RecordAPITestBase {
 
                LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
 
-               String[] localStrategies = { 
PactCompiler.HINT_LOCAL_STRATEGY_SORT };
-               String[] shipStrategies = { 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH };
+               String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT 
};
+               String[] shipStrategies = { 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH };
 
                for (String localStrategy : localStrategies) {
                        for (String shipStrategy : shipStrategies) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
index 4b9bc2a..944de98 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java
@@ -28,9 +28,9 @@ import 
org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import 
org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat;
@@ -124,10 +124,10 @@ public class UnionSinkITCase extends RecordAPITestBase {
                Plan plan = new Plan(output);
                plan.setDefaultParallelism(DOP);
 
-               PactCompiler pc = new PactCompiler(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics());
                OptimizedPlan op = pc.compile(plan);
 
-               NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
+               JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index 853a477..33e0807 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.util.CollectionDataSets.{CrazyNested, POJO, 
MutableTuple3,
 CustomType}
-import org.apache.flink.optimizer.PactCompiler
+import org.apache.flink.optimizer.Optimizer
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -324,7 +324,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
     val ds =  CollectionDataSets.get3TupleDataSet(env).map(t => 
t).setParallelism(4)
 
     val cfg: Configuration = new Configuration
-    cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)
+    cfg.setString(Optimizer.HINT_SHIP_STRATEGY, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION)
 
     val reduceDs =  ds.reduceGroup(new 
Tuple3AllGroupReduceWithCombine).withParameters(cfg)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index 8ac52cb..969f970 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -54,17 +54,17 @@ class CustomPartitioningTest extends CompilerTestBase {
       val balancer = 
partitioner.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
-      assertEquals(parallelism, sink.getDegreeOfParallelism)
+      assertEquals(parallelism, sink.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy)
-      assertEquals(parallelism, mapper.getDegreeOfParallelism)
+      assertEquals(parallelism, mapper.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput.getShipStrategy)
       assertEquals(part, partitioner.getInput.getPartitioner)
-      assertEquals(parallelism, partitioner.getDegreeOfParallelism)
+      assertEquals(parallelism, partitioner.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput.getShipStrategy)
-      assertEquals(parallelism, balancer.getDegreeOfParallelism)
+      assertEquals(parallelism, balancer.getParallelism)
     }
     catch {
       case e: Exception => {
@@ -124,17 +124,17 @@ class CustomPartitioningTest extends CompilerTestBase {
       val balancer = 
partitioner.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
-      assertEquals(parallelism, sink.getDegreeOfParallelism)
+      assertEquals(parallelism, sink.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy)
-      assertEquals(parallelism, mapper.getDegreeOfParallelism)
+      assertEquals(parallelism, mapper.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput.getShipStrategy)
       assertEquals(part, partitioner.getInput.getPartitioner)
-      assertEquals(parallelism, partitioner.getDegreeOfParallelism)
+      assertEquals(parallelism, partitioner.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput.getShipStrategy)
-      assertEquals(parallelism, balancer.getDegreeOfParallelism)
+      assertEquals(parallelism, balancer.getParallelism)
     }
     catch {
       case e: Exception => {
@@ -197,23 +197,23 @@ class CustomPartitioningTest extends CompilerTestBase {
       val balancer = 
keyExtractor.getInput.getSource.asInstanceOf[SingleInputPlanNode]
       
       assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy)
-      assertEquals(parallelism, sink.getDegreeOfParallelism)
+      assertEquals(parallelism, sink.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy)
-      assertEquals(parallelism, mapper.getDegreeOfParallelism)
+      assertEquals(parallelism, mapper.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, 
keyRemover.getInput.getShipStrategy)
-      assertEquals(parallelism, keyRemover.getDegreeOfParallelism)
+      assertEquals(parallelism, keyRemover.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_CUSTOM, 
partitioner.getInput.getShipStrategy)
       assertEquals(part, partitioner.getInput.getPartitioner)
-      assertEquals(parallelism, partitioner.getDegreeOfParallelism)
+      assertEquals(parallelism, partitioner.getParallelism)
       
       assertEquals(ShipStrategyType.FORWARD, 
keyExtractor.getInput.getShipStrategy)
-      assertEquals(parallelism, keyExtractor.getDegreeOfParallelism)
+      assertEquals(parallelism, keyExtractor.getParallelism)
       
       assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, 
balancer.getInput.getShipStrategy)
-      assertEquals(parallelism, balancer.getDegreeOfParallelism)
+      assertEquals(parallelism, balancer.getParallelism)
     }
     catch {
       case e: Exception => {

Reply via email to