http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java deleted file mode 100644 index 3ce021b..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationTerminationWithTwoTails.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.iterative; - -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.io.TextInputFormat; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.test.util.RecordAPITestBase; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; -import org.junit.Assert; - -@SuppressWarnings("deprecation") -public class IterationTerminationWithTwoTails extends RecordAPITestBase { - - private static final String INPUT = "1\n" + "2\n" + "3\n" + "4\n" + "5\n"; - private static final String EXPECTED = "22\n"; - - protected String dataPath; - protected String resultPath; - - public IterationTerminationWithTwoTails(){ - setTaskManagerNumSlots(parallelism); - } - - @Override - protected void preSubmit() throws Exception { - dataPath = createTempFile("datapoints.txt", INPUT); - resultPath = getTempFilePath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED, resultPath); - } - - @Override - protected Plan getTestJob() { - return getTestPlanPlan(parallelism, dataPath, resultPath); - } - - private static Plan getTestPlanPlan(int numSubTasks, String input, String output) { - - FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(initialInput); - iteration.setMaximumNumberOfIterations(5); - Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1); - - ReduceOperator sumReduce = ReduceOperator.builder(new SumReducer()) - .input(iteration.getPartialSolution()) - .name("Compute sum (Reduce)") - .build(); - - iteration.setNextPartialSolution(sumReduce); - - MapOperator terminationMapper = MapOperator.builder(new TerminationMapper()) - .input(iteration.getPartialSolution()) - .name("Compute termination criterion (Map)") - .build(); - - iteration.setTerminationCriterion(terminationMapper); - - FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, iteration, "Output"); - CsvOutputFormat.configureRecordFormat(finalResult) - .recordDelimiter('\n') - .fieldDelimiter(' ') - .field(StringValue.class, 0); - - Plan plan = new Plan(finalResult, "Iteration with AllReducer (keyless Reducer)"); - plan.setDefaultParallelism(4); - Assert.assertTrue(plan.getDefaultParallelism() > 1); - return plan; - } - - static final class SumReducer extends ReduceFunction implements Serializable { - - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator<Record> it, Collector<Record> out) { - // Compute the sum - int sum = 0; - - while (it.hasNext()) { - sum += Integer.parseInt(it.next().getField(0, StringValue.class).getValue()) + 1; - } - - out.collect(new Record(new StringValue(Integer.toString(sum)))); - } - } - - public static class TerminationMapper extends MapFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void map(Record record, Collector<Record> collector) { - - int currentSum = Integer.parseInt(record.getField(0, StringValue.class).getValue()); - - if(currentSum < 21) - collector.collect(record); - } - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java index cb16c15..ab66f31 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java @@ -18,91 +18,34 @@ package org.apache.flink.test.iterative; -import java.io.Serializable; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.io.TextInputFormat; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.test.util.RecordAPITestBase; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; -import org.junit.Assert; - -@SuppressWarnings("deprecation") -public class IterationWithAllReducerITCase extends RecordAPITestBase { - - private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"; +import java.util.List; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class IterationWithAllReducerITCase extends JavaProgramTestBase { private static final String EXPECTED = "1\n"; - protected String dataPath; - protected String resultPath; - - public IterationWithAllReducerITCase(){ - setTaskManagerNumSlots(4); - } - @Override - protected void preSubmit() throws Exception { - dataPath = createTempFile("datapoints.txt", INPUT); - resultPath = getTempFilePath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(EXPECTED, resultPath); - } - - @Override - protected Plan getTestJob() { - Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath); - return plan; - } + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); - - private static Plan getTestPlanPlan(int numSubTasks, String input, String output) { + DataSet<String> initialInput = env.fromElements("1", "1", "1", "1", "1", "1", "1", "1"); - FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input"); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(initialInput); - iteration.setMaximumNumberOfIterations(5); - - Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1); + IterativeDataSet<String> iteration = initialInput.iterate(5).name("Loop"); - ReduceOperator sumReduce = ReduceOperator.builder(new PickOneReducer()) - .input(iteration.getPartialSolution()) - .name("Compute sum (Reduce)") - .build(); - - iteration.setNextPartialSolution(sumReduce); + DataSet<String> sumReduce = iteration.reduce(new ReduceFunction<String>(){ + @Override + public String reduce(String value1, String value2) throws Exception { + return value1; + } + }).name("Compute sum (Reduce)"); - FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, iteration, "Output"); - CsvOutputFormat.configureRecordFormat(finalResult) - .recordDelimiter('\n') - .fieldDelimiter(' ') - .field(StringValue.class, 0); + List<String> result = iteration.closeWith(sumReduce).collect(); - Plan plan = new Plan(finalResult, "Iteration with AllReducer (keyless Reducer)"); - - plan.setDefaultParallelism(numSubTasks); - Assert.assertTrue(plan.getDefaultParallelism() > 1); - - return plan; - } - - public static final class PickOneReducer extends ReduceFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator<Record> it, Collector<Record> out) { - out.collect(it.next()); - } + compareResultAsText(result, EXPECTED); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java index c11c9ea..c283df1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithChainingITCase.java @@ -18,43 +18,25 @@ package org.apache.flink.test.iterative; -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat; -import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat; -import org.apache.flink.test.util.RecordAPITestBase; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.IterativeDataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.util.PointFormatter; +import org.apache.flink.test.util.PointInFormat; +import org.apache.flink.test.util.CoordVector; +import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; -@SuppressWarnings("deprecation") -@RunWith(Parameterized.class) -public class IterationWithChainingITCase extends RecordAPITestBase { +public class IterationWithChainingITCase extends JavaProgramTestBase { private static final String DATA_POINTS = "0|50.90|16.20|72.08|\n" + "1|73.65|61.76|62.89|\n" + "2|61.73|49.95|92.74|\n"; private String dataPath; private String resultPath; - public IterationWithChainingITCase(Configuration config) { - super(config); - setTaskManagerNumSlots(parallelism); - } - @Override protected void preSubmit() throws Exception { dataPath = createTempFile("data_points.txt", DATA_POINTS); @@ -62,63 +44,35 @@ public class IterationWithChainingITCase extends RecordAPITestBase { } @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(DATA_POINTS, resultPath); - } - - @Override - protected Plan getTestJob() { - return getTestPlan(config.getInteger("ChainedMapperITCase#NoSubtasks", 1), dataPath, resultPath); - } + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); - @Parameters - public static Collection<Object[]> getConfigurations() { - Configuration config1 = new Configuration(); - config1.setInteger("ChainedMapperITCase#NoSubtasks", parallelism); - return toParameterList(config1); - } + DataSet<Tuple2<Integer, CoordVector>> initialInput + = env.readFile(new PointInFormat(), dataPath).setParallelism(1).name("Input"); - public static final class IdentityMapper extends MapFunction implements Serializable { + IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2).name("Loop"); - private static final long serialVersionUID = 1L; + DataSet<Tuple2<Integer, CoordVector>> identity + = iteration.groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() { + @Override + public void reduce(Iterable<Tuple2<Integer, CoordVector>> values, Collector<Tuple2<Integer, CoordVector>> out) throws Exception { + for (Tuple2<Integer, CoordVector> value : values) { + out.collect(value); + } + } + }).map(new MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>() { + @Override + public Tuple2<Integer, CoordVector> map(Tuple2<Integer, CoordVector> value) throws Exception { + return value; + } - @Override - public void map(Record rec, Collector<Record> out) { - out.collect(rec); - } - } + }); - public static final class DummyReducer extends ReduceFunction implements Serializable { + iteration.closeWith(identity).writeAsFormattedText(resultPath, new PointFormatter()); - private static final long serialVersionUID = 1L; + env.execute("Iteration with chained map test"); - @Override - public void reduce(Iterator<Record> it, Collector<Record> out) { - while (it.hasNext()) { - out.collect(it.next()); - } - } - } - - static Plan getTestPlan(int numSubTasks, String input, String output) { - - FileDataSource initialInput = new FileDataSource(new PointInFormat(), input, "Input"); - initialInput.setParallelism(1); - - BulkIteration iteration = new BulkIteration("Loop"); - iteration.setInput(initialInput); - iteration.setMaximumNumberOfIterations(2); - - ReduceOperator dummyReduce = ReduceOperator.builder(new DummyReducer(), IntValue.class, 0).input(iteration.getPartialSolution()) - .name("Reduce something").build(); - - MapOperator dummyMap = MapOperator.builder(new IdentityMapper()).input(dummyReduce).build(); - iteration.setNextPartialSolution(dummyMap); - - FileDataSink finalResult = new FileDataSink(new PointOutFormat(), output, iteration, "Output"); - - Plan plan = new Plan(finalResult, "Iteration with chained map test"); - plan.setDefaultParallelism(numSubTasks); - return plan; + compareResultsByLinesInMemory(DATA_POINTS, resultPath); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java index 2a4a4b7..8756429 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithUnionITCase.java @@ -25,10 +25,11 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.IterativeDataSet; -import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat; -import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.util.CoordVector; +import org.apache.flink.test.util.PointFormatter; +import org.apache.flink.test.util.PointInFormat; import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.flink.types.Record; import org.apache.flink.util.Collector; public class IterationWithUnionITCase extends JavaProgramTestBase { @@ -54,32 +55,32 @@ public class IterationWithUnionITCase extends JavaProgramTestBase { protected void testProgram() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Record> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1); + DataSet<Tuple2<Integer, CoordVector>> initialInput = env.readFile(new PointInFormat(), this.dataPath).setParallelism(1); - IterativeDataSet<Record> iteration = initialInput.iterate(2); + IterativeDataSet<Tuple2<Integer, CoordVector>> iteration = initialInput.iterate(2); - DataSet<Record> result = iteration.union(iteration).map(new IdentityMapper()); + DataSet<Tuple2<Integer, CoordVector>> result = iteration.union(iteration).map(new IdentityMapper()); - iteration.closeWith(result).write(new PointOutFormat(), this.resultPath); + iteration.closeWith(result).writeAsFormattedText(this.resultPath, new PointFormatter()); env.execute(); } - static final class IdentityMapper implements MapFunction<Record, Record>, Serializable { + static final class IdentityMapper implements MapFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable { private static final long serialVersionUID = 1L; @Override - public Record map(Record rec) { + public Tuple2<Integer, CoordVector> map(Tuple2<Integer, CoordVector> rec) { return rec; } } - static class DummyReducer implements GroupReduceFunction<Record, Record>, Serializable { + static class DummyReducer implements GroupReduceFunction<Tuple2<Integer, CoordVector>, Tuple2<Integer, CoordVector>>, Serializable { private static final long serialVersionUID = 1L; @Override - public void reduce(Iterable<Record> it, Collector<Record> out) { - for (Record r : it) { + public void reduce(Iterable<Tuple2<Integer, CoordVector>> it, Collector<Tuple2<Integer, CoordVector>> out) { + for (Tuple2<Integer, CoordVector> r : it) { out.collect(r); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java deleted file mode 100644 index ac3659a..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterativeKMeansITCase.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.iterative; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast; -import org.apache.flink.test.testdata.KMeansData; -import org.apache.flink.test.util.RecordAPITestBase; - - -public class IterativeKMeansITCase extends RecordAPITestBase { - - protected String dataPath; - protected String clusterPath; - protected String resultPath; - - public IterativeKMeansITCase(){ - setTaskManagerNumSlots(parallelism); - } - - @Override - protected void preSubmit() throws Exception { - dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS); - clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS); - resultPath = getTempDirPath("result"); - } - - @Override - protected Plan getTestJob() { - KMeansBroadcast kmi = new KMeansBroadcast(); - return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20"); - } - - - @Override - protected void postSubmit() throws Exception { - List<String> resultLines = new ArrayList<String>(); - readAllResultLines(resultLines, resultPath); - - KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultLines, 0.1); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java deleted file mode 100644 index fcf43df..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/KMeansITCase.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.iterative; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast; -import org.apache.flink.test.testdata.KMeansData; -import org.apache.flink.test.util.RecordAPITestBase; - - -public class KMeansITCase extends RecordAPITestBase { - - protected String dataPath; - protected String clusterPath; - protected String resultPath; - - public KMeansITCase(){ - setTaskManagerNumSlots(parallelism); - } - - @Override - protected void preSubmit() throws Exception { - dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS); - clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS); - resultPath = getTempDirPath("result"); - } - - @Override - protected Plan getTestJob() { - KMeansBroadcast kmi = new KMeansBroadcast(); - return kmi.getPlan(String.valueOf(parallelism), dataPath, clusterPath, resultPath, "20"); - } - - - @Override - protected void postSubmit() throws Exception { - List<String> resultLines = new ArrayList<String>(); - readAllResultLines(resultLines, resultPath); - - KMeansData.checkResultsWithDelta(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultLines, 0.1); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java index ab8ff45..959a17a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java @@ -21,11 +21,24 @@ package org.apache.flink.test.optimizer.examples; import static org.junit.Assert.*; import java.util.Arrays; +import java.util.Collection; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.record.operators.FileDataSource; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; @@ -34,11 +47,10 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.optimizer.util.OperatorResolver; -import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; -@SuppressWarnings("deprecation") public class KMeansSingleStepTest extends CompilerTestBase { private static final String DATAPOINTS = "Data Points"; @@ -54,16 +66,15 @@ public class KMeansSingleStepTest extends CompilerTestBase { @Test public void testCompileKMeansSingleStepWithStats() { - - KMeansSingleStep kmi = new KMeansSingleStep(); - Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20)); + + Plan p = getKMeansPlan(); p.setExecutionConfig(new ExecutionConfig()); // set the statistics OperatorResolver cr = getContractResolver(p); - FileDataSource pointsSource = cr.getNode(DATAPOINTS); - FileDataSource centersSource = cr.getNode(CENTERS); - setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f); - setSourceStatistics(centersSource, 1024*1024, 32f); + GenericDataSourceBase pointsSource = cr.getNode(DATAPOINTS); + GenericDataSourceBase centersSource = cr.getNode(CENTERS); + setSourceStatistics(pointsSource, 100l * 1024 * 1024 * 1024, 32f); + setSourceStatistics(centersSource, 1024 * 1024, 32f); OptimizedPlan plan = compileWithStats(p); checkPlan(plan); @@ -71,9 +82,8 @@ public class KMeansSingleStepTest extends CompilerTestBase { @Test public void testCompileKMeansSingleStepWithOutStats() { - - KMeansSingleStep kmi = new KMeansSingleStep(); - Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20)); + + Plan p = getKMeansPlan(); p.setExecutionConfig(new ExecutionConfig()); OptimizedPlan plan = compileNoStats(p); checkPlan(plan); @@ -97,7 +107,7 @@ public class KMeansSingleStepTest extends CompilerTestBase { assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy()); assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy()); - assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy()); + assertEquals(DriverStrategy.MAP, mapper.getDriverStrategy()); assertNull(mapper.getInput().getLocalStrategyKeys()); assertNull(mapper.getInput().getLocalStrategySortOrder()); @@ -127,4 +137,145 @@ public class KMeansSingleStepTest extends CompilerTestBase { assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy()); } + + public static Plan getKMeansPlan() { + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + KMeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"}); + } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("KMeans failed with an exception"); + } + return env.getPlan(); + } + + public static void KMeans(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Point> points = env.readCsvFile(args[0]) + .fieldDelimiter(" ") + .includeFields(true, true) + .types(Double.class, Double.class) + .name(DATAPOINTS) + .map(new MapFunction<Tuple2<Double, Double>, Point>() { + @Override + public Point map(Tuple2<Double, Double> value) throws Exception { + return new Point(value.f0, value.f1); + } + }); + + DataSet<Centroid> centroids = env.readCsvFile(args[1]) + .fieldDelimiter(" ") + .includeFields(true, true, true) + .types(Integer.class, Double.class, Double.class) + .name(CENTERS) + .map(new MapFunction<Tuple3<Integer, Double, Double>, Centroid>() { + @Override + public Centroid map(Tuple3<Integer, Double, Double> value) throws Exception { + return new Centroid(value.f0, value.f1, value.f2); + } + }); + + DataSet<Tuple3<Integer, Point, Integer>> newCentroids = points + .map(new SelectNearestCenter()).name(MAPPER_NAME).withBroadcastSet(centroids, "centroids"); + + DataSet<Tuple3<Integer, Point, Integer>> recomputeClusterCenter + = newCentroids.groupBy(0).reduceGroup(new RecomputeClusterCenter()).name(REDUCER_NAME); + + recomputeClusterCenter.project(0, 1).writeAsCsv(args[2], "\n", " ").name(SINK); + + env.execute("KMeans Example"); + } + + public static class Point extends Tuple2<Double, Double> { + public Point(double x, double y) { + this.f0 = x; + this.f1 = y; + } + + public Point add(Point other) { + f0 += other.f0; + f1 += other.f1; + return this; + } + + public Point div(long val) { + f0 /= val; + f1 /= val; + return this; + } + + public double euclideanDistance(Point other) { + return Math.sqrt((f0 - other.f0) * (f0 - other.f0) + (f1 - other.f1) * (f1 - other.f1)); + } + + public double euclideanDistance(Centroid other) { + return Math.sqrt((f0 - other.f1.f0) * (f0 - other.f1.f0) + (f1 - other.f1.f1) * (f1 - other.f1.f1)); + } + } + + public static class Centroid extends Tuple2<Integer, Point> { + public Centroid(int id, double x, double y) { + this.f0 = id; + this.f1 = new Point(x, y); + } + + public Centroid(int id, Point p) { + this.f0 = id; + this.f1 = p; + } + } + + /** + * Determines the closest cluster center for a data point. + */ + public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple3<Integer, Point, Integer>> { + private Collection<Centroid> centroids; + + @Override + public void open(Configuration parameters) throws Exception { + this.centroids = getRuntimeContext().getBroadcastVariable("centroids"); + } + + @Override + public Tuple3<Integer, Point, Integer> map(Point p) throws Exception { + double minDistance = Double.MAX_VALUE; + int closestCentroidId = -1; + for (Centroid centroid : centroids) { + double distance = p.euclideanDistance(centroid); + if (distance < minDistance) { + minDistance = distance; + closestCentroidId = centroid.f0; + } + } + return new Tuple3<>(closestCentroidId, p, 1); + } + } + + @Combinable + public static final class RecomputeClusterCenter extends RichGroupReduceFunction<Tuple3<Integer, Point, Integer>, Tuple3<Integer, Point, Integer>> { + @Override + public void reduce(Iterable<Tuple3<Integer, Point, Integer>> values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception { + int id = -1; + double x = 0; + double y = 0; + int count = 0; + for (Tuple3<Integer, Point, Integer> value : values) { + id = value.f0; + x += value.f1.f0; + y += value.f1.f1; + count += value.f2; + } + out.collect(new Tuple3<>(id, new Point(x, y), count)); + } + + @Override + public void combine(Iterable<Tuple3<Integer, Point, Integer>> values, Collector<Tuple3<Integer, Point, Integer>> out) throws Exception { + reduce(values, out); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java index f4efb8a..e929913 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java @@ -22,10 +22,23 @@ import java.util.Arrays; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; +import org.apache.flink.api.common.operators.DualInputOperator; +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.SingleInputOperator; import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.api.java.record.operators.JoinOperator; -import org.apache.flink.api.java.record.operators.MapOperator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; @@ -35,23 +48,25 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.optimizer.util.OperatorResolver; -import org.apache.flink.test.recordJobs.relational.TPCHQuery3; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; /** * Tests TPCH Q3 (simplified) under various input conditions. */ -@SuppressWarnings("deprecation") +@SuppressWarnings("serial") public class RelationalQueryCompilerTest extends CompilerTestBase { private static final String ORDERS = "Orders"; private static final String LINEITEM = "LineItems"; private static final String MAPPER_NAME = "FilterO"; private static final String JOIN_NAME = "JoinLiO"; + private static final String REDUCE_NAME = "AggLiO"; + private static final String SINK = "Output"; private final FieldList set0 = new FieldList(0); - private final FieldList set01 = new FieldList(new int[] {0,1}); + private final FieldList set01 = new FieldList(0,1); private final ExecutionConfig defaultExecutionConfig = new ExecutionConfig(); // ------------------------------------------------------------------------ @@ -63,8 +78,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { @Test public void testQueryNoStatistics() { try { - TPCHQuery3 query = new TPCHQuery3(); - Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE); + Plan p = getTPCH3Plan(); p.setExecutionConfig(defaultExecutionConfig); // compile final OptimizedPlan plan = compileNoStats(p); @@ -72,12 +86,12 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan); // get the nodes from the final plan - final SinkPlanNode sink = or.getNode("Output"); - final SingleInputPlanNode reducer = or.getNode("AggLio"); + final SinkPlanNode sink = or.getNode(SINK); + final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME); final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ? (SingleInputPlanNode) reducer.getPredecessor() : null; - final DualInputPlanNode join = or.getNode("JoinLiO"); - final SingleInputPlanNode filteringMapper = or.getNode("FilterO"); + final DualInputPlanNode join = or.getNode(JOIN_NAME); + final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME); // verify the optimizer choices checkStandardStrategies(filteringMapper, join, combiner, reducer, sink); @@ -95,7 +109,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { */ @Test public void testQueryAnyValidPlan() { - testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, true, true, true, false, true); + testQueryGeneric(1024*1024*1024L, 8*1024*1024*1024L, 0.05f, 0.05f, true, true, true, false, true); } /** @@ -103,7 +117,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { */ @Test public void testQueryWithSizeZeroInputs() { - testQueryGeneric(0, 0, 0.5f, true, true, true, false, true); + testQueryGeneric(0, 0, 0.1f, 0.5f, true, true, true, false, true); } /** @@ -111,7 +125,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { */ @Test public void testQueryWithStatsForBroadcastHash() { - testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.05f, true, false, true, false, false); + testQueryGeneric(1024l*1024*1024*1024, 1024l*1024*1024*1024, 0.01f, 0.05f, true, false, true, false, false); } /** @@ -119,7 +133,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { */ @Test public void testQueryWithStatsForRepartitionAny() { - testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.5f, false, true, true, true, true); + testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.1f, 0.5f, false, true, true, true, true); } /** @@ -128,34 +142,23 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { */ @Test public void testQueryWithStatsForRepartitionMerge() { - TPCHQuery3 query = new TPCHQuery3(); - Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE); + Plan p = getTPCH3Plan(); p.setExecutionConfig(defaultExecutionConfig); // set compiler hints OperatorResolver cr = getContractResolver(p); - JoinOperator match = cr.getNode("JoinLiO"); + DualInputOperator<?,?,?,?> match = cr.getNode(JOIN_NAME); match.getCompilerHints().setFilterFactor(100f); - testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.05f, 100f, false, true, false, false, true); + testQueryGeneric(100l*1024*1024*1024*1024, 100l*1024*1024*1024*1024, 0.01f, 100f, false, true, false, false, true); } // ------------------------------------------------------------------------ - - private void testQueryGeneric(long orderSize, long lineItemSize, - float ordersFilterFactor, - boolean broadcastOkay, boolean partitionedOkay, - boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) - { - testQueryGeneric(orderSize, lineItemSize, ordersFilterFactor, ordersFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay); - } - private void testQueryGeneric(long orderSize, long lineItemSize, float ordersFilterFactor, float joinFilterFactor, boolean broadcastOkay, boolean partitionedOkay, boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, boolean mergeJoinOkay) { - TPCHQuery3 query = new TPCHQuery3(); - Plan p = query.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE); + Plan p = getTPCH3Plan(); p.setExecutionConfig(defaultExecutionConfig); testQueryGeneric(p, orderSize, lineItemSize, ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay); } @@ -168,10 +171,10 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { try { // set statistics OperatorResolver cr = getContractResolver(p); - FileDataSource ordersSource = cr.getNode(ORDERS); - FileDataSource lineItemSource = cr.getNode(LINEITEM); - MapOperator mapper = cr.getNode(MAPPER_NAME); - JoinOperator joiner = cr.getNode(JOIN_NAME); + GenericDataSourceBase<?,?> ordersSource = cr.getNode(ORDERS); + GenericDataSourceBase<?,?> lineItemSource = cr.getNode(LINEITEM); + SingleInputOperator<?,?,?> mapper = cr.getNode(MAPPER_NAME); + DualInputOperator<?,?,?,?> joiner = cr.getNode(JOIN_NAME); setSourceStatistics(ordersSource, orderSize, 100f); setSourceStatistics(lineItemSource, lineitemSize, 140f); mapper.getCompilerHints().setAvgOutputRecordSize(16f); @@ -183,12 +186,12 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan); // get the nodes from the final plan - final SinkPlanNode sink = or.getNode("Output"); - final SingleInputPlanNode reducer = or.getNode("AggLio"); + final SinkPlanNode sink = or.getNode(SINK); + final SingleInputPlanNode reducer = or.getNode(REDUCE_NAME); final SingleInputPlanNode combiner = reducer.getPredecessor() instanceof SingleInputPlanNode ? (SingleInputPlanNode) reducer.getPredecessor() : null; - final DualInputPlanNode join = or.getNode("JoinLiO"); - final SingleInputPlanNode filteringMapper = or.getNode("FilterO"); + final DualInputPlanNode join = or.getNode(JOIN_NAME); + final SingleInputPlanNode filteringMapper = or.getNode(MAPPER_NAME); checkStandardStrategies(filteringMapper, join, combiner, reducer, sink); @@ -230,7 +233,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { // ------------------------------------------------------------------------ // Checks for special conditions // ------------------------------------------------------------------------ - + private void checkStandardStrategies(SingleInputPlanNode map, DualInputPlanNode join, SingleInputPlanNode combiner, SingleInputPlanNode reducer, SinkPlanNode sink) { @@ -239,7 +242,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); // check the driver strategies that are always fix - Assert.assertEquals(DriverStrategy.COLLECTOR_MAP, map.getDriverStrategy()); + Assert.assertEquals(DriverStrategy.FLAT_MAP, map.getDriverStrategy()); Assert.assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy()); Assert.assertEquals(DriverStrategy.NONE, sink.getDriverStrategy()); if (combiner != null) { @@ -348,4 +351,73 @@ public class RelationalQueryCompilerTest extends CompilerTestBase { return false; } } + + public static Plan getTPCH3Plan() { + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + TCPH3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE}); + } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("TCPH3 failed with an exception"); + } + return env.getPlan(); + } + + public static void TCPH3(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(Integer.parseInt(args[0])); + + //order id, order status, order data, order prio, ship prio + DataSet<Tuple5<Long, String, String, String, Integer>> orders + = env.readCsvFile(args[1]) + .fieldDelimiter("|").lineDelimiter("\n") + .includeFields("101011001").types(Long.class, String.class, String.class, String.class, Integer.class) + .name(ORDERS); + + //order id, extended price + DataSet<Tuple2<Long, Double>> lineItems + = env.readCsvFile(args[2]) + .fieldDelimiter("|").lineDelimiter("\n") + .includeFields("100001").types(Long.class, Double.class) + .name(LINEITEM); + + DataSet<Tuple2<Long, Integer>> filterO = orders.flatMap(new FilterO()).name(MAPPER_NAME); + + DataSet<Tuple3<Long, Integer, Double>> joinLiO = filterO.join(lineItems).where(0).equalTo(0).with(new JoinLiO()).name(JOIN_NAME); + + DataSet<Tuple3<Long, Integer, Double>> aggLiO = joinLiO.groupBy(0, 1).reduceGroup(new AggLiO()).name(REDUCE_NAME); + + aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK); + + env.execute(); + } + + @ForwardedFields("f0; f4->f1") + public static class FilterO implements FlatMapFunction<Tuple5<Long, String, String, String, Integer>, Tuple2<Long, Integer>> { + @Override + public void flatMap(Tuple5<Long, String, String, String, Integer> value, Collector<Tuple2<Long, Integer>> out) throws Exception { + // not going to be executed + } + } + + @ForwardedFieldsFirst("f0; f1") + public static class JoinLiO implements FlatJoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Double>, Tuple3<Long, Integer, Double>> { + @Override + public void join(Tuple2<Long, Integer> first, Tuple2<Long, Double> second, Collector<Tuple3<Long, Integer, Double>> out) throws Exception { + // not going to be executed + } + } + + @ForwardedFields("f0; f1") + @Combinable + public static class AggLiO extends RichGroupReduceFunction<Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>> { + @Override + public void reduce(Iterable<Tuple3<Long, Integer, Double>> values, Collector<Tuple3<Long, Integer, Double>> out) throws Exception { + // not going to be executed + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java index 99402a5..e134c7a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java @@ -20,7 +20,19 @@ package org.apache.flink.test.optimizer.iterations; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; +import org.apache.flink.client.program.PreviewPlanEnvironment; import org.apache.flink.optimizer.dag.TempMode; import org.apache.flink.optimizer.plan.DualInputPlanNode; import org.apache.flink.optimizer.plan.OptimizedPlan; @@ -33,13 +45,14 @@ import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.test.recordJobs.graph.ConnectedComponentsWithCoGroup; +import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; /** * */ +@SuppressWarnings("serial") public class ConnectedComponentsCoGroupTest extends CompilerTestBase { private static final String VERTEX_SOURCE = "Vertices"; @@ -59,10 +72,7 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase { @Test public void testWorksetConnectedComponents() { - ConnectedComponentsWithCoGroup cc = new ConnectedComponentsWithCoGroup(); - - Plan plan = cc.getPlan(String.valueOf(DEFAULT_PARALLELISM), - IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100)); + Plan plan = getConnectedComponentsCoGroupPlan(); plan.setExecutionConfig(new ExecutionConfig()); OptimizedPlan optPlan = compileNoStats(plan); OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(optPlan); @@ -134,4 +144,68 @@ public class ConnectedComponentsCoGroupTest extends CompilerTestBase { JobGraphGenerator jgg = new JobGraphGenerator(); jgg.compileJobGraph(optPlan); } + + public static Plan getConnectedComponentsCoGroupPlan() { + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + ConnectedComponentsWithCoGroup(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"}); + } catch (ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("ConnectedComponentsWithCoGroup failed with an exception"); + } + return env.getPlan(); + } + + public static void ConnectedComponentsWithCoGroup(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(Integer.parseInt(args[0])); + + DataSet<Tuple1<Long>> initialVertices = env.readCsvFile(args[1]).types(Long.class).name(VERTEX_SOURCE); + + DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(args[2]).types(Long.class, Long.class).name(EDGES_SOURCE); + + DataSet<Tuple2<Long, Long>> verticesWithId = initialVertices.flatMap(new DummyMapFunction()); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration + = verticesWithId.iterateDelta(verticesWithId, Integer.parseInt(args[4]), 0).name(ITERATION_NAME); + + DataSet<Tuple2<Long, Long>> joinWithNeighbors = iteration.getWorkset().join(edges) + .where(0).equalTo(0) + .with(new DummyJoinFunction()).name(JOIN_NEIGHBORS_MATCH); + + DataSet<Tuple2<Long, Long>> minAndUpdate = joinWithNeighbors.coGroup(iteration.getSolutionSet()) + .where(0).equalTo(0) + .with(new DummyCoGroupFunction()).name(MIN_ID_AND_UPDATE); + + iteration.closeWith(minAndUpdate, minAndUpdate).writeAsCsv(args[3]).name(SINK); + + env.execute(); + } + + public static class DummyMapFunction implements FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> { + @Override + public void flatMap(Tuple1<Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + // won't be executed + } + } + + public static class DummyJoinFunction implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + @Override + public void join(Tuple2<Long, Long> first, Tuple2<Long, Long> second, Collector<Tuple2<Long, Long>> out) throws Exception { + // won't be executed + } + } + + @ForwardedFieldsFirst("f0->f0") + @ForwardedFieldsSecond("f0->f0") + public static class DummyCoGroupFunction implements CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + @Override + public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple2<Long, Long>> second, Collector<Tuple2<Long, Long>> out) throws Exception { + // won't be executed + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java deleted file mode 100644 index 3785270..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/IterativeKMeansTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.optimizer.iterations; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.operators.util.FieldList; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.optimizer.plan.BulkIterationPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.optimizer.util.OperatorResolver; -import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast; -import org.junit.Assert; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class IterativeKMeansTest extends CompilerTestBase { - - private static final String DATAPOINTS = "Data Points"; - private static final String CENTERS = "Centers"; - - private static final String MAPPER_NAME = "Find Nearest Centers"; - private static final String REDUCER_NAME = "Recompute Center Positions"; - - private static final String ITERATION_NAME = "k-means loop"; - - private static final String SINK = "New Center Positions"; - - private final FieldList set0 = new FieldList(0); - - // -------------------------------------------------------------------------------------------- - // K-Means (Bulk Iteration) - // -------------------------------------------------------------------------------------------- - - @Test - public void testCompileKMeansSingleStepWithStats() { - - KMeansBroadcast kmi = new KMeansBroadcast(); - Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20)); - p.setExecutionConfig(new ExecutionConfig()); - // set the statistics - OperatorResolver cr = getContractResolver(p); - FileDataSource pointsSource = cr.getNode(DATAPOINTS); - FileDataSource centersSource = cr.getNode(CENTERS); - setSourceStatistics(pointsSource, 100l*1024*1024*1024, 32f); - setSourceStatistics(centersSource, 1024*1024, 32f); - - OptimizedPlan plan = compileWithStats(p); - checkPlan(plan); - - new JobGraphGenerator().compileJobGraph(plan); - } - - @Test - public void testCompileKMeansSingleStepWithOutStats() { - - KMeansBroadcast kmi = new KMeansBroadcast(); - Plan p = kmi.getPlan(String.valueOf(DEFAULT_PARALLELISM), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(20)); - p.setExecutionConfig(new ExecutionConfig()); - OptimizedPlan plan = compileNoStats(p); - checkPlan(plan); - - new JobGraphGenerator().compileJobGraph(plan); - } - - private void checkPlan(OptimizedPlan plan) { - - OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan); - - final SinkPlanNode sink = or.getNode(SINK); - final SingleInputPlanNode reducer = or.getNode(REDUCER_NAME); - final SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor(); - final SingleInputPlanNode mapper = or.getNode(MAPPER_NAME); - - final BulkIterationPlanNode iter = or.getNode(ITERATION_NAME); - - // -------------------- outside the loop ----------------------- - - // check the sink - assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy()); - assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy()); - - // check the iteration - assertEquals(ShipStrategyType.FORWARD, iter.getInput().getShipStrategy()); - assertEquals(LocalStrategy.NONE, iter.getInput().getLocalStrategy()); - - - // -------------------- inside the loop ----------------------- - - // check the mapper - assertEquals(1, mapper.getBroadcastInputs().size()); - assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy()); - assertEquals(ShipStrategyType.BROADCAST, mapper.getBroadcastInputs().get(0).getShipStrategy()); - assertFalse(mapper.getInput().isOnDynamicPath()); - assertTrue(mapper.getBroadcastInputs().get(0).isOnDynamicPath()); - assertTrue(mapper.getInput().getTempMode().isCached()); - - assertEquals(LocalStrategy.NONE, mapper.getInput().getLocalStrategy()); - assertEquals(LocalStrategy.NONE, mapper.getBroadcastInputs().get(0).getLocalStrategy()); - - assertEquals(DriverStrategy.COLLECTOR_MAP, mapper.getDriverStrategy()); - - assertNull(mapper.getInput().getLocalStrategyKeys()); - assertNull(mapper.getInput().getLocalStrategySortOrder()); - assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategyKeys()); - assertNull(mapper.getBroadcastInputs().get(0).getLocalStrategySortOrder()); - - // check the combiner - Assert.assertNotNull(combiner); - assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy()); - assertTrue(combiner.getInput().isOnDynamicPath()); - - assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy()); - assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy()); - assertNull(combiner.getInput().getLocalStrategyKeys()); - assertNull(combiner.getInput().getLocalStrategySortOrder()); - assertEquals(set0, combiner.getKeys(0)); - assertEquals(set0, combiner.getKeys(1)); - - // check the reducer - assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy()); - assertTrue(reducer.getInput().isOnDynamicPath()); - assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy()); - assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy()); - assertEquals(set0, reducer.getKeys(0)); - assertEquals(set0, reducer.getInput().getLocalStrategyKeys()); - assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0))); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java index 24d9416..d186cbb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java @@ -25,13 +25,12 @@ import org.apache.flink.client.program.PreviewPlanEnvironment; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.examples.java.clustering.KMeans; +import org.apache.flink.examples.java.graph.ConnectedComponents; +import org.apache.flink.examples.java.graph.PageRankBasic; +import org.apache.flink.examples.java.relational.TPCHQuery3; +import org.apache.flink.examples.java.relational.WebLogAnalysis; +import org.apache.flink.examples.java.wordcount.WordCount; import org.apache.flink.optimizer.util.CompilerTestBase; -import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas; -import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast; -import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep; -import org.apache.flink.test.recordJobs.relational.TPCHQuery3; -import org.apache.flink.test.recordJobs.relational.WebLogAnalysis; -import org.apache.flink.test.recordJobs.wordcount.WordCount; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParser; @@ -45,17 +44,34 @@ public class DumpCompiledPlanTest extends CompilerTestBase { @Test public void dumpWordCount() { - dump(new WordCount().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + WordCount.main(new String[] {IN_FILE, OUT_FILE}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("WordCount failed with an exception"); + } + dump(env.getPlan()); } @Test public void dumpTPCH3() { - dump(new TPCHQuery3().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE)); - } - - @Test - public void dumpKMeans() { - dump(new KMeansSingleStep().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("TPCH3 failed with an exception"); + } + dump(env.getPlan()); } @Test @@ -64,7 +80,6 @@ public class DumpCompiledPlanTest extends CompilerTestBase { PreviewPlanEnvironment env = new PreviewPlanEnvironment(); env.setAsContext(); try { - // <points path> <centers path> <result path> <num iterations KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { // all good. @@ -77,17 +92,50 @@ public class DumpCompiledPlanTest extends CompilerTestBase { @Test public void dumpWebLogAnalysis() { - dump(new WebLogAnalysis().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("WebLogAnalysis failed with an exception"); + } + dump(env.getPlan()); } @Test public void dumpBulkIterationKMeans() { - dump(new KMeansBroadcast().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("ConnectedComponents failed with an exception"); + } + dump(env.getPlan()); } @Test - public void dumpDeltaPageRank() { - dump(new DeltaPageRankWithInitialDeltas().getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10")); + public void dumpPageRank() { + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("PagaRank failed with an exception"); + } + dump(env.getPlan()); } private void dump(Plan p) { http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java index 49fe6d8..95a06c3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java @@ -21,16 +21,17 @@ package org.apache.flink.test.optimizer.jsonplan; import java.util.List; import org.apache.flink.api.common.Plan; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PreviewPlanEnvironment; +import org.apache.flink.examples.java.clustering.KMeans; +import org.apache.flink.examples.java.graph.ConnectedComponents; +import org.apache.flink.examples.java.graph.PageRankBasic; +import org.apache.flink.examples.java.relational.TPCHQuery3; +import org.apache.flink.examples.java.wordcount.WordCount; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.test.recordJobs.graph.DeltaPageRankWithInitialDeltas; -import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast; -import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep; -import org.apache.flink.test.recordJobs.relational.TPCHQuery3; -import org.apache.flink.test.recordJobs.relational.WebLogAnalysis; -import org.apache.flink.test.recordJobs.wordcount.WordCount; -import org.apache.flink.util.OperatingSystem; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParser; @@ -40,51 +41,102 @@ import org.junit.Test; /* * The tests in this class simply invokes the JSON dump code for the original plan. */ -public class PreviewPlanDumpTest { +public class PreviewPlanDumpTest extends CompilerTestBase { - protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/test/file" : "file:///test/file"; - - protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/test/output" : "file:///test/output"; - - protected static final String[] NO_ARGS = new String[0]; - @Test public void dumpWordCount() { - dump(new WordCount().getPlan("4", IN_FILE, OUT_FILE)); - - // The web interface passes empty string-args to compute the preview of the - // job, so we should test this situation too - dump(new WordCount().getPlan(NO_ARGS)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + WordCount.main(new String[] {IN_FILE, OUT_FILE}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("WordCount failed with an exception"); + } + dump(env.getPlan()); } @Test public void dumpTPCH3() { - dump(new TPCHQuery3().getPlan("4", IN_FILE, IN_FILE, OUT_FILE)); - dump(new TPCHQuery3().getPlan(NO_ARGS)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + TPCHQuery3.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("TPCH3 failed with an exception"); + } + dump(env.getPlan()); } @Test - public void dumpKMeans() { - dump(new KMeansSingleStep().getPlan("4", IN_FILE, IN_FILE, OUT_FILE)); - dump(new KMeansSingleStep().getPlan(NO_ARGS)); + public void dumpIterativeKMeans() { + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("KMeans failed with an exception"); + } + dump(env.getPlan()); } @Test public void dumpWebLogAnalysis() { - dump(new WebLogAnalysis().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE)); - dump(new WebLogAnalysis().getPlan(NO_ARGS)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("WebLogAnalysis failed with an exception"); + } + dump(env.getPlan()); } - + @Test public void dumpBulkIterationKMeans() { - dump(new KMeansBroadcast().getPlan("4", IN_FILE, OUT_FILE)); - dump(new KMeansBroadcast().getPlan(NO_ARGS)); + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + ConnectedComponents.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("ConnectedComponents failed with an exception"); + } + dump(env.getPlan()); } @Test - public void dumpDeltaPageRank() { - dump(new DeltaPageRankWithInitialDeltas().getPlan("4", IN_FILE, IN_FILE, IN_FILE, OUT_FILE, "10")); - dump(new DeltaPageRankWithInitialDeltas().getPlan(NO_ARGS)); + public void dumpPageRank() { + // prepare the test environment + PreviewPlanEnvironment env = new PreviewPlanEnvironment(); + env.setAsContext(); + try { + PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"}); + } catch(OptimizerPlanEnvironment.ProgramAbortException pae) { + // all good. + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("PagaRank failed with an exception"); + } + dump(env.getPlan()); } private void dump(Plan p) { http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 310ded8..ec2dbb7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -29,8 +29,6 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.test.util.RecordAPITestBase; -import org.junit.After; import org.junit.Ignore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.TimeUnit; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.test.util.JavaProgramTestBase; @Ignore public class NetworkStackThroughputITCase { @@ -62,8 +62,8 @@ public class NetworkStackThroughputITCase { // ------------------------------------------------------------------------ - // wrapper to reuse RecordAPITestBase code in runs via main() - private static class TestBaseWrapper extends RecordAPITestBase { + // wrapper to reuse JavaProgramTestBase code in runs via main() + private static class TestBaseWrapper extends JavaProgramTestBase { private int dataVolumeGb; private boolean useForwarder; @@ -90,7 +90,6 @@ public class NetworkStackThroughputITCase { setTaskManagerNumSlots(numSlots); } - @Override protected JobGraph getJobGraph() throws Exception { return createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism); } @@ -138,19 +137,19 @@ public class NetworkStackThroughputITCase { return jobGraph; } - @After - public void calculateThroughput() { - if (getJobExecutionResult() != null) { - int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1); - long dataVolumeMbit = dataVolumeGb * 8192; - long runtimeSecs = getJobExecutionResult().getNetRuntime(TimeUnit.SECONDS); + @Override + protected void testProgram() throws Exception { + JobExecutionResult jer = executor.submitJobAndWait(getJobGraph(), false); + int dataVolumeGb = this.config.getInteger(DATA_VOLUME_GB_CONFIG_KEY, 1); + + long dataVolumeMbit = dataVolumeGb * 8192; + long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS); - int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs); + int mbitPerSecond = (int) (((double) dataVolumeMbit) / runtimeSecs); - LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " + - "data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit)); - } + LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, " + + "data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit)); } } @@ -289,8 +288,7 @@ public class NetworkStackThroughputITCase { TestBaseWrapper test = new TestBaseWrapper(config); System.out.println(Arrays.toString(p)); - test.testJob(); - test.calculateThroughput(); + test.testProgram(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8abae2c2/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java b/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java deleted file mode 100644 index 0cff9b6..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/testPrograms/util/tests/IntTupleDataInFormatTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.testPrograms.util.tests; - -import org.apache.flink.test.recordJobs.util.IntTupleDataInFormat; -import org.apache.flink.test.recordJobs.util.Tuple; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.junit.Assert; -import org.junit.Test; - -public class IntTupleDataInFormatTest -{ - @Test - public void testReadLineKeyValuePairOfIntValueTupleByteArray() { - - String[] testTuples = { - "1|attribute1|attribute2|3|attribute4|5|", - "2|3|", - "3|attribute1|attribute2|", - "-1|attr1|attr2|", - "-2|attribute1|attribute2|", - Integer.MAX_VALUE+"|attr1|attr2|attr3|attr4|", - Integer.MIN_VALUE+"|attr1|attr2|attr3|attr4|" - }; - - int[] expectedKeys = { - 1,2,3,-1,-2,Integer.MAX_VALUE,Integer.MIN_VALUE - }; - - int[] expectedAttrCnt = {6,2,3,3,3,5,5}; - - IntTupleDataInFormat inFormat = new IntTupleDataInFormat(); - Record rec = new Record(); - - for(int i = 0; i < testTuples.length; i++) { - - byte[] tupleBytes = testTuples[i].getBytes(); - - inFormat.readRecord(rec, tupleBytes, 0, tupleBytes.length); - - Assert.assertTrue("Expected Key: "+expectedKeys[i]+" != Returned Key: "+rec.getField(0, IntValue.class), rec.getField(0, IntValue.class).equals(new IntValue(expectedKeys[i]))); - Assert.assertTrue("Expected Attr Cnt: "+expectedAttrCnt[i]+" != Returned Attr Cnt: "+rec.getField(1, Tuple.class), rec.getField(1, Tuple.class).getNumberOfColumns() == expectedAttrCnt[i]); - } - } -}