Hi! I have been trying to re-implement the program you sent here (the code is incomplete), but I cannot trigger the exception. Can you send us the complete example?
Stephan On Fri, Nov 28, 2014 at 3:18 PM, Maximilian Alber < [email protected]> wrote: > Hi Flinksters! > > I try to write a BulkIteration. Somehow I get a cryptic error message, at > least I have no clue what's wrong: > > Code: > > var width = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions) > * config.startWidth)) map {x => new Vector(0, x.values)} > var update = env.fromCollection[Vector](Seq(Vector.ones(config.dimensions) > * 0.01F)) map {x => new Vector(1, x.values)} > var lastGradient = > env.fromCollection[Vector](Seq(Vector.zeros(config.dimensions))) map {x => > new Vector(2, x.values)} > > var stepSet = width union update union lastGradient > stepSet = stepSet.iterate(config.gradientDescentIterations){ > stepSet => > var width = stepSet filter {_.id == 0} > var update = stepSet filter {_.id == 1} > var lastGradient = stepSet filter {_.id == 2} > > val gradient = getGradient(X, residual, center, width) > val term = gradient * lastGradient > lastGradient = gradient > > update = update.map(new RichMapFunction[Vector, Vector]{ > var term: Vector = null > val minWidthUpdate = 0.00000001F > val maxWidthUpdate = 10.0F > override def open(config: Configuration) = { > term = getRuntimeContext.getBroadcastVariable("term").toList.head > } > > def map(x: Vector) = {x.condMul(term.isLess(0), > 0.5F).condMul(term.isGreater(0), 1.2F).clip(minWidthUpdate, maxWidthUpdate)} > }).withBroadcastSet(term, "term") > /* > width = width.map(new RichMapFunction[Vector, Vector]{ > var update: Vector = null > var gradient: Vector = null > override def open(config: Configuration) = { > update = > getRuntimeContext.getBroadcastVariable("update").toList.head > gradient = > getRuntimeContext.getBroadcastVariable("gradient").toList.head > } > > def map(x: Vector) = {(x + update * (gradient > sign)).clip(config.minWidth, config.maxWidth)} > }).withBroadcastSet(update, "update")withBroadcastSet(gradient, > "gradient") > */ > > width union update union lastGradient > } > > > Error: > > java.lang.IllegalStateException > at > org.apache.flink.compiler.dag.BulkPartialSolutionNode.setCandidateProperties(BulkPartialSolutionNode.java:50) > at > org.apache.flink.compiler.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:292) > at > org.apache.flink.compiler.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:367) > at > org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:315) > at > org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258) > at > org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258) > at > org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258) > at > org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:105) > at > org.apache.flink.compiler.dag.BinaryUnionNode.getAlternativePlans(BinaryUnionNode.java:104) > at > org.apache.flink.compiler.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:258) > at > org.apache.flink.compiler.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:194) > at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:561) > at org.apache.flink.compiler.PactCompiler.compile(PactCompiler.java:466) > at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:197) > at org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:210) > at org.apache.flink.client.program.Client.run(Client.java:288) > at org.apache.flink.client.program.Client.run(Client.java:231) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:347) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:334) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1001) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1025) > > Thanks! > Cheers, > Max >
