http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java index 3cf081f..d6b9444 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java @@ -35,6 +35,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Assert; import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java index f6885c5..3af64fc 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.optimizer.util.DummyInputFormat; import org.apache.flink.optimizer.util.DummyOutputFormat; import org.apache.flink.optimizer.util.IdentityReduce; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java index 230cc6b..25643a4 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java @@ -39,6 +39,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java index 1fe16bb..3a24ce1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java @@ -38,6 +38,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.PlanNode; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.util.Visitor; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java index 40b54e0..65e5025 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java @@ -20,6 +20,7 @@ package org.apache.flink.optimizer; import static org.junit.Assert.*; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.functions.Partitioner; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java index 92b4fc5..1001626 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java @@ -31,6 +31,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java index 5d15ed8..2e52565 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.record.operators.FileDataSource; import org.apache.flink.api.java.record.operators.ReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.types.IntValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Visitor; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java index 1e4124c..e81e0ec 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.common.Plan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import static org.junit.Assert.fail; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java index 80c0bda..321ca5a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java @@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java index 6e7c0a3..27f367f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java @@ -42,6 +42,7 @@ import org.apache.flink.optimizer.util.DummyOutputFormat; import org.apache.flink.optimizer.util.IdentityMap; import org.apache.flink.optimizer.util.IdentityReduce; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.types.LongValue; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java index 0273659..b4e95fb 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java @@ -26,13 +26,13 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java index 08f7388..d52181d 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java @@ -29,13 +29,13 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java index 9fd676f..5758c86 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java @@ -28,12 +28,12 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java index f865a9f..00fd587 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java @@ -27,11 +27,11 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.IdentityPartitionerMapper; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java index 360487b..0408ca9 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java @@ -30,12 +30,12 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java index 8cd4809..74e5c8c 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java @@ -26,12 +26,12 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java index 779b8e5..72fb81b 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java @@ -29,12 +29,12 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyReducer; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java index eae40cf..8eedee1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java @@ -30,13 +30,13 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java index cb4bd78..5f69336 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; @@ -35,6 +34,7 @@ import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction; import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper; import org.apache.flink.optimizer.testfunctions.SelectOneReducer; import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.io.network.DataExchangeMode; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java index 5175d8c..13ec51a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java @@ -24,12 +24,12 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor; import org.apache.flink.optimizer.testfunctions.Top1GroupReducer; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.io.network.DataExchangeMode; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java index 6b2691a..99f8c81 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java @@ -25,11 +25,11 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.io.network.DataExchangeMode; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java index b359e6b..ab83dba 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java @@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java; import static org.junit.Assert.fail; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; @@ -28,7 +29,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.CompilerTestBase; @SuppressWarnings({"serial", "unchecked"}) http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java index 2f9b32f..96758b1 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java @@ -20,12 +20,12 @@ package org.apache.flink.optimizer.java; import static org.junit.Assert.*; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java index c0e2fa7..1bd4b8a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java @@ -24,12 +24,12 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.GroupReduceOperator; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.util.Collector; import org.junit.Test; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java index 57d2d54..796d4ab 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java @@ -27,7 +27,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.BulkIterationPlanNode; import org.apache.flink.optimizer.plan.NAryUnionPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; @@ -36,6 +35,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java index 0a62132..14d863d 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java @@ -27,10 +27,10 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SinkPlanNode; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.util.Visitor; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java index cd63b72..3f18e62 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java @@ -27,8 +27,8 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.testfunctions.IdentityMapper; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java index 8bb9a76..95ee4de 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java @@ -27,11 +27,11 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java index e1b18f9..4197abb 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java @@ -23,10 +23,10 @@ import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java index f1c2233..46eb48a 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java @@ -33,11 +33,11 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichJoinFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.util.Collector; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java index e7807c9..fb7a80f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java @@ -25,10 +25,10 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction; import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @SuppressWarnings({"serial", "unchecked"}) http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java index 9171cc7..8a4786f 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java @@ -25,9 +25,9 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.CompilerTestBase; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; @SuppressWarnings({"serial", "unchecked"}) http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java new file mode 100644 index 0000000..35c50d3 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.base.BulkIterationBase; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.Visitor; +import org.junit.Before; + +/** + * Base class for Optimizer tests. Offers utility methods to trigger optimization + * of a program and to fetch the nodes in an optimizer plan that correspond + * the the node in the program plan. + */ +public abstract class CompilerTestBase implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random"; + + protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null"; + + protected static final int DEFAULT_PARALLELISM = 8; + + protected static final String DEFAULT_PARALLELISM_STRING = String.valueOf(DEFAULT_PARALLELISM); + + private static final String CACHE_KEY = "cachekey"; + + // ------------------------------------------------------------------------ + + protected transient DataStatistics dataStats; + + protected transient Optimizer withStatsCompiler; + + protected transient Optimizer noStatsCompiler; + + private transient int statCounter; + + // ------------------------------------------------------------------------ + + @Before + public void setup() { + Configuration flinkConf = new Configuration(); + this.dataStats = new DataStatistics(); + this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator(), flinkConf); + this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM); + + this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator(), flinkConf); + this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM); + } + + // ------------------------------------------------------------------------ + + public OptimizedPlan compileWithStats(Plan p) { + return this.withStatsCompiler.compile(p); + } + + public OptimizedPlan compileNoStats(Plan p) { + return this.noStatsCompiler.compile(p); + } + + public static OperatorResolver getContractResolver(Plan plan) { + return new OperatorResolver(plan); + } + + public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) { + setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth)); + } + + public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) { + final String key = CACHE_KEY + this.statCounter++; + this.dataStats.cacheBaseStatistics(stats, key); + source.setStatisticsKey(key); + } + + + public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) { + return new OptimizerPlanNodeResolver(plan); + } + + // ------------------------------------------------------------------------ + + public static final class OptimizerPlanNodeResolver { + + private final Map<String, ArrayList<PlanNode>> map; + + public OptimizerPlanNodeResolver(OptimizedPlan p) { + HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>(); + + for (PlanNode n : p.getAllNodes()) { + Operator<?> c = n.getOriginalOptimizerNode().getOperator(); + String name = c.getName(); + + ArrayList<PlanNode> list = map.get(name); + if (list == null) { + list = new ArrayList<PlanNode>(2); + map.put(name, list); + } + + // check whether this node is a child of a node with the same contract (aka combiner) + boolean shouldAdd = true; + for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) { + PlanNode in = iter.next(); + if (in.getOriginalOptimizerNode().getOperator() == c) { + // is this the child or is our node the child + if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) { + SingleInputPlanNode thisNode = (SingleInputPlanNode) n; + SingleInputPlanNode otherNode = (SingleInputPlanNode) in; + + if (thisNode.getPredecessor() == otherNode) { + // other node is child, remove it + iter.remove(); + } else if (otherNode.getPredecessor() == thisNode) { + shouldAdd = false; + } + } else { + throw new RuntimeException("Unrecodnized case in test."); + } + } + } + + if (shouldAdd) { + list.add(n); + } + } + + this.map = map; + } + + + @SuppressWarnings("unchecked") + public <T extends PlanNode> T getNode(String name) { + List<PlanNode> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name."); + } else if (nodes.size() != 1) { + throw new RuntimeException("Multiple nodes found with the given name."); + } else { + return (T) nodes.get(0); + } + } + + @SuppressWarnings("unchecked") + public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) { + List<PlanNode> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name and stub class."); + } else { + PlanNode found = null; + for (PlanNode node : nodes) { + if (node.getClass() == stubClass) { + if (found == null) { + found = node; + } else { + throw new RuntimeException("Multiple nodes found with the given name and stub class."); + } + } + } + if (found == null) { + throw new RuntimeException("No node found with the given name and stub class."); + } else { + return (T) found; + } + } + } + + public List<PlanNode> getNodes(String name) { + List<PlanNode> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name."); + } else { + return new ArrayList<PlanNode>(nodes); + } + } + } + + /** + * Collects all DataSources of a plan to add statistics + * + */ + public static class SourceCollectorVisitor implements Visitor<Operator<?>> { + + protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4); + + @Override + public boolean preVisit(Operator<?> visitable) { + + if(visitable instanceof GenericDataSourceBase) { + sources.add((GenericDataSourceBase<?, ?>) visitable); + } + else if(visitable instanceof BulkIterationBase) { + ((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this); + } + + return true; + } + + @Override + public void postVisit(Operator<?> visitable) {} + + public List<GenericDataSourceBase<?, ?>> getSources() { + return this.sources; + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java new file mode 100644 index 0000000..920b713 --- /dev/null +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.java.record.operators.BulkIteration; +import org.apache.flink.api.java.record.operators.DeltaIteration; +import org.apache.flink.util.Visitor; + +/** + * Utility to get operator instances from plans via name. + */ +@SuppressWarnings("deprecation") +public class OperatorResolver implements Visitor<Operator<?>> { + + private final Map<String, List<Operator<?>>> map; + private Set<Operator<?>> seen; + + public OperatorResolver(Plan p) { + this.map = new HashMap<String, List<Operator<?>>>(); + this.seen = new HashSet<Operator<?>>(); + + p.accept(this); + this.seen = null; + } + + + @SuppressWarnings("unchecked") + public <T extends Operator<?>> T getNode(String name) { + List<Operator<?>> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No nodes found with the given name."); + } else if (nodes.size() != 1) { + throw new RuntimeException("Multiple nodes found with the given name."); + } else { + return (T) nodes.get(0); + } + } + + @SuppressWarnings("unchecked") + public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) { + List<Operator<?>> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name and stub class."); + } else { + Operator<?> found = null; + for (Operator<?> node : nodes) { + if (node.getClass() == stubClass) { + if (found == null) { + found = node; + } else { + throw new RuntimeException("Multiple nodes found with the given name and stub class."); + } + } + } + if (found == null) { + throw new RuntimeException("No node found with the given name and stub class."); + } else { + return (T) found; + } + } + } + + public List<Operator<?>> getNodes(String name) { + List<Operator<?>> nodes = this.map.get(name); + if (nodes == null || nodes.isEmpty()) { + throw new RuntimeException("No node found with the given name."); + } else { + return new ArrayList<Operator<?>>(nodes); + } + } + + @Override + public boolean preVisit(Operator<?> visitable) { + if (this.seen.add(visitable)) { + // add to the map + final String name = visitable.getName(); + List<Operator<?>> list = this.map.get(name); + if (list == null) { + list = new ArrayList<Operator<?>>(2); + this.map.put(name, list); + } + list.add(visitable); + + // recurse into bulk iterations + if (visitable instanceof BulkIteration) { + ((BulkIteration) visitable).getNextPartialSolution().accept(this); + } else if (visitable instanceof DeltaIteration) { + ((DeltaIteration) visitable).getSolutionSetDelta().accept(this); + ((DeltaIteration) visitable).getNextWorkset().accept(this); + } + + return true; + } else { + return false; + } + } + + @Override + public void postVisit(Operator<?> visitable) {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-quickstart/pom.xml ---------------------------------------------------------------------- diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml index 78b99ad..4f3b4b2 100644 --- a/flink-quickstart/pom.xml +++ b/flink-quickstart/pom.xml @@ -59,7 +59,7 @@ under the License. <artifactId>maven-archetype-plugin</artifactId> <version>2.2</version><!--$NO-MVN-MAN-VER$--> <configuration> - <skip>${skipTests}</skip> + <skip>true</skip> </configuration> </plugin> <!-- deactivate the shade plugin for the quickstart archetypes --> http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java index fd91d65..f345d6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java @@ -29,7 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.util.InstantiationUtil; /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index cb799c4..1c76e08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil; import org.slf4j.LoggerFactory; import org.slf4j.Logger; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE; import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 78e9707..626d21f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index b2f11db..50b1f24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -29,7 +29,7 @@ import java.net.Socket; import java.net.SocketException; import java.security.MessageDigest; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 5db5ef6..69687da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.blob; import com.google.common.io.BaseEncoding; import org.apache.commons.io.FileUtils; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.slf4j.Logger; import java.io.EOFException; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java index 9f72db0..842870d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.client; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * An exception which is thrown by the JobClient if a job is aborted as a result of a user http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java index 99c3f89..56ccef5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.client; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * This exception is the base exception for all exceptions that denote any failure during http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java index d2c81a5..c33ed8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.client; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java index 3d672a5..3cb0b9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.client; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * This exception denotes an error while submitting a job to the JobManager http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java index 10ef601..18c8c31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.client; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * An exception which is thrown by the JobClient if the job manager is no longer reachable. http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index f7518bd..5d96903 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 556bb11..503a0b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memorymanager.MemoryManager; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java index c6270b2..3fb7493 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index d394e2d..848f619 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -35,7 +35,7 @@ import java.util.TimerTask; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java index 532107f..66bda45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.execution.librarycache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java index 63d85b4..52a8048 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.execution.librarycache; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import java.io.File; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index d0615b3..5ce89b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index ad72d13..acbc17a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobEdge; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 8dd341f..1b91089 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobEdge; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java index b838aa4..3449100 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java @@ -38,7 +38,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.IOUtils; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index e27b7ea..324629f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -27,7 +27,7 @@ import java.util.Set; import akka.actor.ActorRef; import org.apache.flink.util.AbstractID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java index eab4344..7bd70a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.instance; import org.apache.flink.util.AbstractID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment; import java.util.HashSet; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java index c89b7f5..56be9d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.instance; import org.apache.flink.util.AbstractID; import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.Locality; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java index 08d6441..bf8464c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.instance; import org.apache.flink.util.AbstractID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index d490784..55b89b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.dispatch.OnFailure; import akka.pattern.Patterns; import akka.util.Timeout; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; @@ -35,7 +36,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 92e27d3..88020a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.taskmanager.TaskManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java index 0ea3a1c..6e84b4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java @@ -18,7 +18,8 @@ package org.apache.flink.runtime.io.network.partition; -import org.apache.flink.runtime.jobgraph.JobID; + +import org.apache.flink.api.common.JobID; public interface ResultPartitionConsumableNotifier { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index f8f22b0..d43533b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java deleted file mode 100644 index 7c8d365..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.jobgraph; - -import javax.xml.bind.DatatypeConverter; - -import org.apache.flink.util.AbstractID; - -import java.nio.ByteBuffer; - -public final class JobID extends AbstractID { - - private static final long serialVersionUID = 1L; - - public JobID() { - super(); - } - - public JobID(long lowerPart, long upperPart) { - super(lowerPart, upperPart); - } - - public JobID(byte[] bytes) { - super(bytes); - } - - public static JobID generate() { - return new JobID(); - } - - public static JobID fromByteArray(byte[] bytes) { - return new JobID(bytes); - } - - public static JobID fromByteBuffer(ByteBuffer buf) { - long lower = buf.getLong(); - long upper = buf.getLong(); - return new JobID(lower, upper); - } - - public static JobID fromHexString(String hexString) { - return new JobID(DatatypeConverter.parseHexBinary(hexString)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java index 3130354..5bf8ebe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java @@ -24,7 +24,7 @@ import java.util.LinkedList; import java.util.Map; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * This class manages the accumulators for different jobs. Either the jobs are http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java index ffb0dd4..008be65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java @@ -56,7 +56,7 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.util.EnvironmentInformation; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java index 8f16e4f..4c4a20e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.profiling; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * This interface must be implemented by profiling components http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java index 85dedb7..d5ce137 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.Map; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.profiling.impl.types.InternalExecutionVertexThreadProfilingData;