http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java index 5a0a6fc..bf2bbae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java @@ -32,7 +32,7 @@ import org.apache.flink.optimizer.plan.BulkIterationPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; @@ -77,7 +77,7 @@ public class IterativeKMeansTest extends CompilerTestBase { OptimizedPlan plan = compileWithStats(p); checkPlan(plan); - new NepheleJobGraphGenerator().compileJobGraph(plan); + new JobGraphGenerator().compileJobGraph(plan); } @Test @@ -89,7 +89,7 @@ public class IterativeKMeansTest extends CompilerTestBase { OptimizedPlan plan = compileNoStats(p); checkPlan(plan); - new NepheleJobGraphGenerator().compileJobGraph(plan); + new JobGraphGenerator().compileJobGraph(plan); } private void checkPlan(OptimizedPlan plan) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java index cc8f788..e6a1e69 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/MultipleJoinsWithSolutionSetCompilerTest.java @@ -39,7 +39,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SolutionSetPlanNode; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; @SuppressWarnings("serial") public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase { @@ -79,7 +79,7 @@ public class MultipleJoinsWithSolutionSetCompilerTest extends CompilerTestBase { assertEquals(SolutionSetPlanNode.class, join1.getInput1().getSource().getClass()); assertEquals(SolutionSetPlanNode.class, join2.getInput2().getSource().getClass()); - new NepheleJobGraphGenerator().compileJobGraph(optPlan); + new JobGraphGenerator().compileJobGraph(optPlan); } catch (Exception e) { System.err.println(e.getMessage()); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java index 731e344..f05bd25 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.fail; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.BulkIterationPlanNode; import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; @@ -68,7 +68,7 @@ public class PageRankCompilerTest extends CompilerTestBase{ IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(10); Configuration cfg = new Configuration(); - cfg.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); + cfg.setString(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND); DataSet<Tuple2<Long, Double>> newRanks = iteration // join pages with outgoing edges and distribute rank http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java index 1cf92e6..b348333 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/plandump/PreviewPlanDumpTest.java @@ -21,7 +21,7 @@ package org.apache.flink.test.compiler.plandump; import java.util.List; import org.apache.flink.api.common.Plan; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas; @@ -89,7 +89,7 @@ public class PreviewPlanDumpTest { private void dump(Plan p) { try { - List<DataSinkNode> sinks = PactCompiler.createPreOptimizedPlan(p); + List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(p); PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator(); String json = dumper.getPactPlanAsJSON(sinks); JsonParser parser = new JsonFactory().createJsonParser(json); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java index b88eb4e..41b84e6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleJavaPrograms/WordCountITCase.java @@ -27,11 +27,6 @@ public class WordCountITCase extends JavaProgramTestBase { protected String textPath; protected String resultPath; - public WordCountITCase(){ - setDegreeOfParallelism(4); - setNumTaskManagers(2); - setTaskManagerNumSlots(2); - } @Override protected void preSubmit() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java index a5c2bb4..9021c6a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java @@ -24,9 +24,9 @@ import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.MapOperator; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat; @@ -88,11 +88,11 @@ public class TaskFailureITCase extends FailingTestBase { plan.setDefaultParallelism(DOP); // optimize and compile plan - PactCompiler pc = new PactCompiler(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics()); OptimizedPlan op = pc.compile(plan); // return job graph of failing job - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); } @@ -118,11 +118,11 @@ public class TaskFailureITCase extends FailingTestBase { plan.setDefaultParallelism(4); // optimize and compile plan - PactCompiler pc = new PactCompiler(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics()); OptimizedPlan op = pc.compile(plan); // return job graph of working job - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java index 037610e..a0e3468 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java @@ -39,12 +39,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase { protected String textPath; protected String resultPath; - public WordCountMapredITCase(){ -// setDegreeOfParallelism(4); -// setNumTaskManagers(2); -// setTaskManagerNumSlots(2); - } - @Override protected void preSubmit() throws Exception { textPath = createTempFile("text.txt", WordCountData.TEXT); @@ -59,7 +53,6 @@ public class WordCountMapredITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -// env.setDegreeOfParallelism(1); DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java index 3bdaa22..fee49bf 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java @@ -39,12 +39,6 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase { protected String textPath; protected String resultPath; - public WordCountMapreduceITCase(){ -// setDegreeOfParallelism(4); -// setNumTaskManagers(2); -// setTaskManagerNumSlots(2); - } - @Override protected void preSubmit() throws Exception { textPath = createTempFile("text.txt", WordCountData.TEXT); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java index 79d49aa..e5f91b4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java @@ -31,7 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CrazyNested; @@ -377,7 +377,7 @@ public class GroupReduceITCase extends MultipleProgramsTestBase { .map(new IdentityMapper<Tuple3<Integer, Long, String>>()).setParallelism(4); Configuration cfg = new Configuration(); - cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION); + cfg.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION); DataSet<Tuple2<Integer, String>> reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine()) .withParameters(cfg); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java index d92897d..220611d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupITCase.java @@ -32,7 +32,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat; import org.apache.flink.api.java.record.operators.CoGroupOperator; import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.util.RecordAPITestBase; import org.apache.flink.types.IntValue; @@ -156,9 +156,9 @@ public class CoGroupITCase extends RecordAPITestBase { CoGroupOperator testCoGrouper = CoGroupOperator.builder(new TestCoGrouper(), StringValue.class, 0, 0) .build(); testCoGrouper.setDegreeOfParallelism(config.getInteger("CoGroupTest#NoSubtasks", 1)); - testCoGrouper.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY, + testCoGrouper.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY, config.getString("CoGroupTest#LocalStrategy", "")); - testCoGrouper.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY, + testCoGrouper.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY, config.getString("CoGroupTest#ShipStrategy", "")); FileDataSink output = new FileDataSink(new CoGroupOutFormat(), resultPath); @@ -181,9 +181,9 @@ public class CoGroupITCase extends RecordAPITestBase { LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE }; + String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE }; - String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH }; + String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH }; for (String localStrategy : localStrategies) { for (String shipStrategy : shipStrategies) { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java index f72d146..f6b4127 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CrossITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.record.io.DelimitedInputFormat; import org.apache.flink.api.java.record.operators.CrossOperator; import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat; @@ -132,20 +132,20 @@ public class CrossITCase extends RecordAPITestBase { CrossOperator testCross = CrossOperator.builder(new TestCross()).build(); testCross.setDegreeOfParallelism(config.getInteger("CrossTest#NoSubtasks", 1)); - testCross.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY, + testCross.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY, config.getString("CrossTest#LocalStrategy", "")); if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_FIRST")) { - testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_BROADCAST); - testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_FORWARD); + testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, + Optimizer.HINT_SHIP_STRATEGY_BROADCAST); + testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, + Optimizer.HINT_SHIP_STRATEGY_FORWARD); } else if (config.getString("CrossTest#ShipStrategy", "").equals("BROADCAST_SECOND")) { - testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_BROADCAST); - testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_FORWARD); + testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, + Optimizer.HINT_SHIP_STRATEGY_BROADCAST); + testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, + Optimizer.HINT_SHIP_STRATEGY_FORWARD); } else { - testCross.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY, + testCross.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY, config.getString("CrossTest#ShipStrategy", "")); } @@ -170,10 +170,10 @@ public class CrossITCase extends RecordAPITestBase { LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST, - PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND, - PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST, - PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND }; + String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST, + Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND, + Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST, + Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND }; String[] shipStrategies = { "BROADCAST_FIRST", "BROADCAST_SECOND" // PactCompiler.HINT_SHIP_STRATEGY_BROADCAST http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java index 6906d36..02a6e38 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/JoinITCase.java @@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.DelimitedInputFormat; import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.Configuration; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat; @@ -122,20 +122,20 @@ public class JoinITCase extends RecordAPITestBase { JoinOperator testMatcher = JoinOperator.builder(new TestMatcher(), StringValue.class, 0, 0) .build(); testMatcher.setDegreeOfParallelism(config.getInteger("MatchTest#NoSubtasks", 1)); - testMatcher.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY, + testMatcher.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY, config.getString("MatchTest#LocalStrategy", "")); if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_FIRST")) { - testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_BROADCAST); - testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_FORWARD); + testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, + Optimizer.HINT_SHIP_STRATEGY_BROADCAST); + testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, + Optimizer.HINT_SHIP_STRATEGY_FORWARD); } else if (config.getString("MatchTest#ShipStrategy", "").equals("BROADCAST_SECOND")) { - testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_FORWARD); - testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, - PactCompiler.HINT_SHIP_STRATEGY_BROADCAST); + testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, + Optimizer.HINT_SHIP_STRATEGY_FORWARD); + testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, + Optimizer.HINT_SHIP_STRATEGY_BROADCAST); } else { - testMatcher.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY, + testMatcher.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY, config.getString("MatchTest#ShipStrategy", "")); } @@ -160,10 +160,10 @@ public class JoinITCase extends RecordAPITestBase { LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE, - PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND }; + String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE, + Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST, Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND }; - String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST", "BROADCAST_SECOND"}; + String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH, "BROADCAST_FIRST", "BROADCAST_SECOND"}; for (String localStrategy : localStrategies) { for (String shipStrategy : shipStrategies) { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java index 5f5f33d..3c8d372 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ReduceITCase.java @@ -25,9 +25,9 @@ import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat; @@ -119,9 +119,9 @@ public class ReduceITCase extends RecordAPITestBase { ReduceOperator testReducer = ReduceOperator.builder(new TestReducer(), StringValue.class, 0) .build(); testReducer.setDegreeOfParallelism(config.getInteger("ReduceTest#NoSubtasks", 1)); - testReducer.getParameters().setString(PactCompiler.HINT_LOCAL_STRATEGY, + testReducer.getParameters().setString(Optimizer.HINT_LOCAL_STRATEGY, config.getString("ReduceTest#LocalStrategy", "")); - testReducer.getParameters().setString(PactCompiler.HINT_SHIP_STRATEGY, + testReducer.getParameters().setString(Optimizer.HINT_SHIP_STRATEGY, config.getString("ReduceTest#ShipStrategy", "")); FileDataSink output = new FileDataSink( @@ -133,10 +133,10 @@ public class ReduceITCase extends RecordAPITestBase { Plan plan = new Plan(output); - PactCompiler pc = new PactCompiler(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics()); OptimizedPlan op = pc.compile(plan); - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); } @@ -151,8 +151,8 @@ public class ReduceITCase extends RecordAPITestBase { LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - String[] localStrategies = { PactCompiler.HINT_LOCAL_STRATEGY_SORT }; - String[] shipStrategies = { PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH }; + String[] localStrategies = { Optimizer.HINT_LOCAL_STRATEGY_SORT }; + String[] shipStrategies = { Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH }; for (String localStrategy : localStrategies) { for (String shipStrategy : shipStrategies) { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java index 4b9bc2a..944de98 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/UnionSinkITCase.java @@ -28,9 +28,9 @@ import org.apache.flink.api.java.record.operators.FileDataSink; import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.MapOperator; import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.PactCompiler; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plantranslate.NepheleJobGraphGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat; @@ -124,10 +124,10 @@ public class UnionSinkITCase extends RecordAPITestBase { Plan plan = new Plan(output); plan.setDefaultParallelism(DOP); - PactCompiler pc = new PactCompiler(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics()); OptimizedPlan op = pc.compile(plan); - NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator(); + JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala index 853a477..33e0807 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.scala.util.CollectionDataSets.{CrazyNested, POJO, MutableTuple3, CustomType} -import org.apache.flink.optimizer.PactCompiler +import org.apache.flink.optimizer.Optimizer import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode @@ -324,7 +324,7 @@ class GroupReduceITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas val ds = CollectionDataSets.get3TupleDataSet(env).map(t => t).setParallelism(4) val cfg: Configuration = new Configuration - cfg.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION) + cfg.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION) val reduceDs = ds.reduceGroup(new Tuple3AllGroupReduceWithCombine).withParameters(cfg) http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala index 8ac52cb..969f970 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala @@ -54,17 +54,17 @@ class CustomPartitioningTest extends CompilerTestBase { val balancer = partitioner.getInput.getSource.asInstanceOf[SingleInputPlanNode] assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) - assertEquals(parallelism, sink.getDegreeOfParallelism) + assertEquals(parallelism, sink.getParallelism) assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy) - assertEquals(parallelism, mapper.getDegreeOfParallelism) + assertEquals(parallelism, mapper.getParallelism) assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy) assertEquals(part, partitioner.getInput.getPartitioner) - assertEquals(parallelism, partitioner.getDegreeOfParallelism) + assertEquals(parallelism, partitioner.getParallelism) assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput.getShipStrategy) - assertEquals(parallelism, balancer.getDegreeOfParallelism) + assertEquals(parallelism, balancer.getParallelism) } catch { case e: Exception => { @@ -124,17 +124,17 @@ class CustomPartitioningTest extends CompilerTestBase { val balancer = partitioner.getInput.getSource.asInstanceOf[SingleInputPlanNode] assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) - assertEquals(parallelism, sink.getDegreeOfParallelism) + assertEquals(parallelism, sink.getParallelism) assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy) - assertEquals(parallelism, mapper.getDegreeOfParallelism) + assertEquals(parallelism, mapper.getParallelism) assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy) assertEquals(part, partitioner.getInput.getPartitioner) - assertEquals(parallelism, partitioner.getDegreeOfParallelism) + assertEquals(parallelism, partitioner.getParallelism) assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput.getShipStrategy) - assertEquals(parallelism, balancer.getDegreeOfParallelism) + assertEquals(parallelism, balancer.getParallelism) } catch { case e: Exception => { @@ -197,23 +197,23 @@ class CustomPartitioningTest extends CompilerTestBase { val balancer = keyExtractor.getInput.getSource.asInstanceOf[SingleInputPlanNode] assertEquals(ShipStrategyType.FORWARD, sink.getInput.getShipStrategy) - assertEquals(parallelism, sink.getDegreeOfParallelism) + assertEquals(parallelism, sink.getParallelism) assertEquals(ShipStrategyType.FORWARD, mapper.getInput.getShipStrategy) - assertEquals(parallelism, mapper.getDegreeOfParallelism) + assertEquals(parallelism, mapper.getParallelism) assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput.getShipStrategy) - assertEquals(parallelism, keyRemover.getDegreeOfParallelism) + assertEquals(parallelism, keyRemover.getParallelism) assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput.getShipStrategy) assertEquals(part, partitioner.getInput.getPartitioner) - assertEquals(parallelism, partitioner.getDegreeOfParallelism) + assertEquals(parallelism, partitioner.getParallelism) assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput.getShipStrategy) - assertEquals(parallelism, keyExtractor.getDegreeOfParallelism) + assertEquals(parallelism, keyExtractor.getParallelism) assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput.getShipStrategy) - assertEquals(parallelism, balancer.getDegreeOfParallelism) + assertEquals(parallelism, balancer.getParallelism) } catch { case e: Exception => {