[FLINK-1532] [tests] Fix spurious failure in AggregatorsITCase (plus minor 
cleanups)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a22b71c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a22b71c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a22b71c

Branch: refs/heads/master
Commit: 0a22b71c887749d297e0f00f4bbdc4af58832a48
Parents: 1dafd81
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 13 12:22:35 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 13 17:11:13 2015 +0100

----------------------------------------------------------------------
 .../aggregators/AggregatorsITCase.java          | 25 ++++++++++----------
 1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a22b71c/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 63cac17..9dcf6fc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -37,16 +37,15 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Test the functionality of aggregators in bulk and delta iterative cases.
- *
  */
 @RunWith(Parameterized.class)
 public class AggregatorsITCase extends MultipleProgramsTestBase {
@@ -287,7 +286,6 @@ public class AggregatorsITCase extends 
MultipleProgramsTestBase {
 
                @Override
                public void open(Configuration conf) {
-
                        aggr = 
getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
                }
 
@@ -319,10 +317,16 @@ public class AggregatorsITCase extends 
MultipleProgramsTestBase {
        @SuppressWarnings("serial")
        public static final class TupleMakerMap extends 
RichMapFunction<Integer, Tuple2<Integer, Integer>> {
 
+               private Random rnd;
+
                @Override
-               public Tuple2<Integer, Integer> map(Integer value) throws 
Exception {
-                       Random ran = new Random();
-                       Integer nodeId = Integer.valueOf(ran.nextInt(100000));
+               public void open(Configuration parameters){
+                       rnd = new Random(0xC0FFEBADBEEFDEADL + 
getRuntimeContext().getIndexOfThisSubtask());
+               }
+
+               @Override
+               public Tuple2<Integer, Integer> map(Integer value) {
+                       Integer nodeId = Integer.valueOf(rnd.nextInt(100000));
                        return new Tuple2<Integer, Integer>(nodeId, value);
                }
 
@@ -337,7 +341,6 @@ public class AggregatorsITCase extends 
MultipleProgramsTestBase {
 
                @Override
                public void open(Configuration conf) {
-
                        aggr = 
getIterationRuntimeContext().getIterationAggregator(NEGATIVE_ELEMENTS_AGGR);
                        superstep = 
getIterationRuntimeContext().getSuperstepNumber();
 
@@ -366,15 +369,13 @@ public class AggregatorsITCase extends 
MultipleProgramsTestBase {
                private int superstep;
 
                @Override
-               public void open(Configuration conf) { 
-
+               public void open(Configuration conf) {
                        superstep = 
getIterationRuntimeContext().getSuperstepNumber();
-
                }
 
                @Override
                public void flatMap(Tuple2<Tuple2<Integer, Integer>, 
Tuple2<Integer, Integer>> value,
-                               Collector<Tuple2<Integer, Integer>> out) throws 
Exception {
+                               Collector<Tuple2<Integer, Integer>> out) {
 
                        if (value.f0.f1  > superstep) {
                                out.collect(value.f0);

Reply via email to