Hi Max, I’d recommend you to use the DataSet[T].iterateWithTermination method instead. It has the following syntax: iterationWithTermination(maxIterations: Int)(stepFunction: (DataSet[T] => (DataSet[T], DataSet[_])): DataSet[T]
There you see that your step function has to return a tuple of data sets. The first tuple value is the result for the next iteration. The second data set defines the convergence criterion. If the DataSet is empty, then the iteration will be terminated. If not and if the maximum number of iterations has not been exceeded, then the next iteration is started. Cheers, Till On Fri, Jul 17, 2015 at 3:43 PM, Maximilian Alber < alber.maximil...@gmail.com> wrote: > Hi Flinksters, > > I try to use BulkIterations with a convergence criterion. Unfortunately, > I'm not sure how to use them and I couldn't find a nice example. > > Here are two code snippets and the resulting error, maybe someone can help. > I'm working on the current branch. > > Example1: > > if(true){ > val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7)) > > val agg = new LongSumAggregator; > > val ds2 = ds.iterate(10)({ > x => > > x map { y => y*2 } > }).registerAggregator("test", agg) > println(ds2) > //.registerAggregationConvergenceCriterion("test", agg, new > LongZeroConvergence) > > println(ds2.collect) > } > > Error: > > Exception in thread "main" java.lang.UnsupportedOperationException: > Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c255 > cannot have aggregators. > at > org.apache.flink.api.scala.DataSet.registerAggregator(DataSet.scala:194) > at Test$delayedInit$body.apply(test.scala:386) > at scala.Function0$class.apply$mcV$sp(Function0.scala:40) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:71) > at scala.App$$anonfun$main$1.apply(App.scala:71) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) > at scala.App$class.main(App.scala:71) > at Test$.main(test.scala:47) > at Test.main(test.scala) > :run FAILED > > > > Example 2: > > > if(true){ > val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7)) > > val agg = new LongSumAggregator; > > val ds2 = ds.iterate(10)({ > x => > > x map { y => y*2 } > }).registerAggregator("test", > agg).registerAggregationConvergenceCriterion("test", agg, new > LongZeroConvergence) > > println(ds2.collect) > } > > > Error: > > :compileScala > [ant:scalac] > /media/alber/datadisk/work/devel/flink_tutorial/code/test/src/main/scala/test.scala:386: > error: value registerAggregationConvergenceCriterion is not a member of > org.apache.flink.api.scala.DataSet[Int] > [ant:scalac] }).registerAggregator("test", > agg).registerAggregationConvergenceCriterion("test", agg, new > LongZeroConvergence) > [ant:scalac] ^ > [ant:scalac] one error found > :compileScala FAILED > > > > Thanks! > > Cheers, > Max >