Hi Flinksters,

I try to use BulkIterations with a convergence criterion. Unfortunately,
I'm not sure how to use them and I couldn't find a nice example.

Here are two code snippets and the resulting error, maybe someone can help.
I'm working on the current branch.

Example1:

  if(true){
    val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))

    val agg = new LongSumAggregator;

    val ds2 = ds.iterate(10)({
      x =>

      x map { y => y*2 }
    }).registerAggregator("test", agg)
    println(ds2)
    //.registerAggregationConvergenceCriterion("test", agg, new
LongZeroConvergence)

    println(ds2.collect)
  }

Error:

Exception in thread "main" java.lang.UnsupportedOperationException:
Operator org.apache.flink.api.java.operators.BulkIterationResultSet@9a2c255
cannot have aggregators.
    at
org.apache.flink.api.scala.DataSet.registerAggregator(DataSet.scala:194)
    at Test$delayedInit$body.apply(test.scala:386)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at Test$.main(test.scala:47)
    at Test.main(test.scala)
:run FAILED



Example 2:


  if(true){
    val ds = env.fromCollection(Seq(1, 2, 3, 4, 5, 6, 7))

    val agg = new LongSumAggregator;

    val ds2 = ds.iterate(10)({
      x =>

      x map { y => y*2 }
    }).registerAggregator("test",
agg).registerAggregationConvergenceCriterion("test", agg, new
LongZeroConvergence)

    println(ds2.collect)
  }


Error:

:compileScala
[ant:scalac]
/media/alber/datadisk/work/devel/flink_tutorial/code/test/src/main/scala/test.scala:386:
error: value registerAggregationConvergenceCriterion is not a member of
org.apache.flink.api.scala.DataSet[Int]
[ant:scalac]     }).registerAggregator("test",
agg).registerAggregationConvergenceCriterion("test", agg, new
LongZeroConvergence)
[ant:scalac]                                        ^
[ant:scalac] one error found
:compileScala FAILED



Thanks!

Cheers,
Max

Reply via email to