Thanks! On Wed, Dec 10, 2014 at 4:35 PM, Stephan Ewen <[email protected]> wrote:
> Hi! > > Yes, that is a known limitation. It is sort of straightforward to resolve, > but will take a bit of time. I will try and get to it until the end of the > week. > > Stephan > > > On Tue, Dec 9, 2014 at 11:21 AM, Maximilian Alber < > [email protected]> wrote: > >> Hi flinksters, >> >> I'm really close to the end (at least I hope so), but I still have some >> issues. >> Writing my final loop I got this error: >> >> org.apache.flink.compiler.CompilerException: An error occurred while >> translating the optimized plan to a nephele JobGraph: An error occurred >> while translating the optimized plan to a nephele JobGraph: Error: It is >> currently not supported to union between dynamic and static path in an >> iteration. >> at >> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:567) >> at >> org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator.postVisit(NepheleJobGraphGenerator.java:96) >> at >> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) >> at >> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196) >> at >> org.apache.flink.compiler.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:196) >> >> I guess that I should use just the loop step set to create the next step >> set? >> >> Here is the code: >> >> def createPlanFullIterations(env: ExecutionEnvironment) = { >> val tmp = env readTextFile config.inFile map >> {Vector.parseFromSVMLightString (config.dimensions, _)} >> val X = tmp map {_._1} >> var residual = tmp map {_._2} >> val randoms = env readTextFile config.randomFile map >> {Vector.parseFromString(_)} >> val widthCandidates: DataSet[Vector] = if(config.widthCandidatesFile >> != null) >> env readTextFile config.widthCandidatesFile map >> {Vector.parseFromString(config.dimensions, _)} >> else >> null >> >> val emptyDataSet = env.fromCollection[Vector](Seq()) >> val model = emptyDataSet.iterate(config.iterations){ >> stepSet => >> val center = calcCenter(env, X, residual, randoms, -1) >> >> val x = calcWidthHeight(env, X, residual, widthCandidates, center) >> val width = x._1 >> val height = x._2 >> >> residual = residual - (getKernelVector(X, center, width).map(new >> RichMapFunction[Vector, Vector]{ >> var height: Vector = null >> override def open(config: Configuration) = { >> height = >> getRuntimeContext.getBroadcastVariable("height").toList.head >> } >> def map(x: Vector) = {x * height} >> }).withBroadcastSet(height, "height")) >> >> val centerOut = center map {x => new Vector(0, x.values)} >> val widthOut = width map {x => new Vector(1, x.values)} >> val heightOut = height map {x => new Vector(2, x.values)} >> val stepModel = centerOut union widthOut union height >> >> stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{ >> def map(x: Vector) = new >> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, >> x.values) >> })) >> } >> >> model map { _ toString } writeAsText config.outFile >> } >> >> >> thanks! >> Cheers, >> Max >> > >
