[FLINK-1532] [tests] Fix spurious failure in AggregatorsITCase (plus minor cleanups)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a22b71c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a22b71c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a22b71c Branch: refs/heads/master Commit: 0a22b71c887749d297e0f00f4bbdc4af58832a48 Parents: 1dafd81 Author: Stephan Ewen <se...@apache.org> Authored: Fri Feb 13 12:22:35 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Feb 13 17:11:13 2015 +0100 ---------------------------------------------------------------------- .../aggregators/AggregatorsITCase.java | 25 ++++++++++---------- 1 file changed, 13 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a22b71c/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java index 63cac17..9dcf6fc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java @@ -37,16 +37,15 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.IterativeDataSet; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Test the functionality of aggregators in bulk and delta iterative cases. - * */ @RunWith(Parameterized.class) public class AggregatorsITCase extends MultipleProgramsTestBase { @@ -287,7 +286,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase { @Override public void open(Configuration conf) { - aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR); } @@ -319,10 +317,16 @@ public class AggregatorsITCase extends MultipleProgramsTestBase { @SuppressWarnings("serial") public static final class TupleMakerMap extends RichMapFunction<Integer, Tuple2<Integer, Integer>> { + private Random rnd; + @Override - public Tuple2<Integer, Integer> map(Integer value) throws Exception { - Random ran = new Random(); - Integer nodeId = Integer.valueOf(ran.nextInt(100000)); + public void open(Configuration parameters){ + rnd = new Random(0xC0FFEBADBEEFDEADL + getRuntimeContext().getIndexOfThisSubtask()); + } + + @Override + public Tuple2<Integer, Integer> map(Integer value) { + Integer nodeId = Integer.valueOf(rnd.nextInt(100000)); return new Tuple2<Integer, Integer>(nodeId, value); } @@ -337,7 +341,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase { @Override public void open(Configuration conf) { - aggr = getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR); superstep = getIterationRuntimeContext().getSuperstepNumber(); @@ -366,15 +369,13 @@ public class AggregatorsITCase extends MultipleProgramsTestBase { private int superstep; @Override - public void open(Configuration conf) { - + public void open(Configuration conf) { superstep = getIterationRuntimeContext().getSuperstepNumber(); - } @Override public void flatMap(Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> value, - Collector<Tuple2<Integer, Integer>> out) throws Exception { + Collector<Tuple2<Integer, Integer>> out) { if (value.f0.f1 > superstep) { out.collect(value.f0);