Thanks! I made a workaround using a pseudo join with the workset. But now I'm back to the nested iteration issue. Is there any chance that this feature will be available in the next time(2-3 weeks)?
Cheers, Max On Wed, Jan 7, 2015 at 10:11 AM, Aljoscha Krettek <[email protected]> wrote: > Hi, > the problem is that your operations do not depend on the > iteration-step-dataset. Your code could be rewritten like this to make it > more obvious: > > val emptyDataSet = env.fromCollection[Vector](Seq()) > // here we call the function > val center = calcCenter(env, X, residual, randoms, -1) > > val centerX = (X subtV center) map {_ square} > val x = calcWidthHeight(env, centerX, residual, widthCandidates, > center) > val width = x._1 > val height = x._2 > > residual = residual - (getKernelVector(X, center, width) multV height) > > val centerOut = center map {x => new Vector(0, x.values)} > val widthOut = width map {x => new Vector(1, x.values)} > val heightOut = height map {x => new Vector(2, x.values)} > val stepModel = centerOut union widthOut union height > > // here the loop begins > val model = emptyDataSet.iterate(config.iterations){ > stepSet => > > stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ > def map(x: Vector) = new > Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values) > })) > } > > model map { _ toString } writeAsText config.outFile > } > > This means that for the system these operations are considered to be > outside the loop, thus you don't have access to the IterationContext. > > Regards, > Aljoscha > > On Tue, Jan 6, 2015 at 4:13 PM, Maximilian Alber < > [email protected]> wrote: > >> Hey Flinksters! >> >> ran into this error >> >> java.lang.IllegalStateException: This stub is not part of an iteration >> step function. >> >> below is my code, the concerning parts are marked. Is it a problem, that >> the stub is in a function that is called from the iteration step function? >> >> >> Code: >> >> ...... >> >> val emptyDataSet = env.fromCollection[Vector](Seq()) >> // here the loop begins >> val model = emptyDataSet.iterate(config.iterations){ >> stepSet => >> // here we call the function >> val center = calcCenter(env, X, residual, randoms, -1) >> >> val centerX = (X subtV center) map {_ square} >> val x = calcWidthHeight(env, centerX, residual, widthCandidates, >> center) >> val width = x._1 >> val height = x._2 >> >> residual = residual - (getKernelVector(X, center, width) multV >> height) >> >> val centerOut = center map {x => new Vector(0, x.values)} >> val widthOut = width map {x => new Vector(1, x.values)} >> val heightOut = height map {x => new Vector(2, x.values)} >> val stepModel = centerOut union widthOut union height >> >> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ >> def map(x: Vector) = new >> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, >> x.values) >> })) >> } >> >> model map { _ toString } writeAsText config.outFile >> } >> >> >> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual: >> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector] >> = { >> val residual_2 = residual * residual >> val randomValue = if(iteration >= 0) >> (randoms filter {_.id == iteration}) >> else >> // and this filter function causes the error >> (randoms.filter(new RichFilterFunction[Vector]{ >> def filter(x: Vector) = x.id == >> (getIterationRuntimeContext.getSuperstepNumber-1) >> })) >> val ys = ((residual_2 sumV() neutralize) * (randomValue neutralize)) >> >> ..... >> >> The full errror: >> >> Error: The program execution failed: java.lang.IllegalStateException: >> This stub is not part of an iteration step function. >> at >> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59) >> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:119) >> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:118) >> at >> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47) >> at >> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79) >> at >> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176) >> at >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) >> at java.lang.Thread.run(Thread.java:745) >> >> >> I append my program with the input files. To reproduce the error use >> following command line args, please replace in_file, random_file, >> width_candidates with the provided ones, and put for out_file the path you >> want to: >> >> flink run -v bump_boost-0.1.jar -c bumpboost.Job in_file=foobar >> out_file=/tmp/tmphIHeEs random_file=/tmp/tmpeKQPZk dimensions=1 N=100 >> width_candidates_file=/tmp/tmpTJLKMm N_width_candidates=50 iterations=30 >> multi_bump_boost=0 gradient_descent_iterations=30 cache=False >> start_width=1.0 min_width=-4 max_width=6 min_width_update=1e-08 >> max_width_update=10 >> >> Thanks! >> Cheers, >> Max >> > >
