Hi Flinksters, I try to use BulkIterations with a convergence criterion. Unfortunately, I'm not sure how to use them and I couldn't find a nice example.
Here are two code snippets and the resulting error, maybe someone can help. I'm working on the current branch. Example1: if(true){ val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7)) val agg = new LongSumAggregator; val ds2 = ds.iterate(10)({ x => x map { y => y*2 } }).registerAggregator("test", agg) println(ds2) //.registerAggregationConvergenceCriterion("test", agg, new LongZeroConvergence) println(ds2.collect) } Error: Exception in thread "main" java.lang.UnsupportedOperationException: Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c255 cannot have aggregators. at org.apache.flink.api.scala.DataSet.registerAggregator(DataSet.scala:194) at Test$delayedInit$body.apply(test.scala:386) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Test$.main(test.scala:47) at Test.main(test.scala) :run FAILED Example 2: if(true){ val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7)) val agg = new LongSumAggregator; val ds2 = ds.iterate(10)({ x => x map { y => y*2 } }).registerAggregator("test", agg).registerAggregationConvergenceCriterion("test", agg, new LongZeroConvergence) println(ds2.collect) } Error: :compileScala [ant:scalac] /media/alber/datadisk/work/devel/flink_tutorial/code/test/src/main/scala/test.scala:386: error: value registerAggregationConvergenceCriterion is not a member of org.apache.flink.api.scala.DataSet[Int] [ant:scalac] }).registerAggregator("test", agg).registerAggregationConvergenceCriterion("test", agg, new LongZeroConvergence) [ant:scalac] ^ [ant:scalac] one error found :compileScala FAILED Thanks! Cheers, Max