Hi Truong, I'm afraid what you're experiencing is to be expected. Currently, for loops do not perform well in Flink since there is no support for caching intermediate results yet. This has been a quite often requested feature lately, so maybe it will be added soon :) Until then, I suggest you try implementing your logic using iterate or iterateDelta.
Cheers, -Vasia. On 5 July 2016 at 17:11, Nguyen Xuan Truong <truongn...@gmail.com> wrote: > Hi, > > I have a Flink program which is similar to Kmeans algorithm. I use normal > iteration(for loop) because Flink iteration does not allow to compute the > intermediate results(in this case the topDistance) within one iteration. > The problem is that my program only runs when maxIteration is small. When > the maxIterations is big, Flink jobs inside the forloop are not scheduled, > deployed or executed. The program hangs forever without any exception, > error or log message. > > I ran the program on both local and cluster environments, having the same > issue. I tried with smaller inputs (points and seeds), having the same > issue. > > Does anybody have an idea about what is the problem? (Maybe the forloop > creates many Flink jobs?) > > Here is the pseudo-code of my program: > > DataSet[Point] points = env.readTextFile(inputPoints) > DataSet[Point] seeds = env.readTextFile(inputSeeds) > discardNumber: Int = 100 > maxIterations: Int = 20 // maxIteration = 30 will hang the program and no > Flink job inside the forloop jobs is deployed) > > for(iteration <- 1 to maxIterations) { > > val intermediateSeeds = points > .map() > .withBroadcastSet(seeds, "seeds") > > //topDistance contains only only double value. > var topDistance = intermediateSeeds > .mapPartition() > .first(discardNumber) > .groupBy() > .reduceGroup() > > val newSeeds = intermediateSeeds > .map() > .groupBy(0) > .reduce ().withBroadcastSet(topDistance, "topDistance") > .map() > > seeds = newSeeds > } > > val finalResult = seeds.collect() > > > Thanks, > Truong >