Hi Truong, I guess the problem is that you want to use topDistance as a broadcast set inside the iteration? If I understand correctly this is a dataset with a single value, right? Could you maybe compute it with an aggregator instead?
-Vasia. On 5 July 2016 at 21:48, Nguyen Xuan Truong <truongn...@gmail.com> wrote: > Hi Vasia, > > Thank you very much for your explanation :). When running with small > maxIteration, the job graph that Flink executed was optimal. However, when > maxIterations was large, Flink took very long time to generate the job > graph. The actually time to execute the jobs was very fast but the time to > optimize and schedule the jobs was slow. > > Regarding your suggestion, I didn't use iterate/iterateDelta because I > need to access the intermediate results within an iteration (the > topDistance in my pseudo-code). As you said before, Flink does not support > that feature, so I wondered if you have a workround for interate or > iterateDelta? > > Thanks, > Truong > > On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri < > vasilikikala...@gmail.com> wrote: > >> Hi Truong, >> >> I'm afraid what you're experiencing is to be expected. Currently, for >> loops do not perform well in Flink since there is no support for caching >> intermediate results yet. This has been a quite often requested feature >> lately, so maybe it will be added soon :) >> Until then, I suggest you try implementing your logic using iterate or >> iterateDelta. >> >> Cheers, >> -Vasia. >> >> On 5 July 2016 at 17:11, Nguyen Xuan Truong <truongn...@gmail.com> wrote: >> >>> Hi, >>> >>> I have a Flink program which is similar to Kmeans algorithm. I use >>> normal iteration(for loop) because Flink iteration does not allow to >>> compute the intermediate results(in this case the topDistance) within one >>> iteration. The problem is that my program only runs when maxIteration is >>> small. When the maxIterations is big, Flink jobs inside the forloop are not >>> scheduled, deployed or executed. The program hangs forever without any >>> exception, error or log message. >>> >>> I ran the program on both local and cluster environments, having the >>> same issue. I tried with smaller inputs (points and seeds), having the same >>> issue. >>> >>> Does anybody have an idea about what is the problem? (Maybe the forloop >>> creates many Flink jobs?) >>> >>> Here is the pseudo-code of my program: >>> >>> DataSet[Point] points = env.readTextFile(inputPoints) >>> DataSet[Point] seeds = env.readTextFile(inputSeeds) >>> discardNumber: Int = 100 >>> maxIterations: Int = 20 // maxIteration = 30 will hang the program and >>> no Flink job inside the forloop jobs is deployed) >>> >>> for(iteration <- 1 to maxIterations) { >>> >>> val intermediateSeeds = points >>> .map() >>> .withBroadcastSet(seeds, "seeds") >>> >>> //topDistance contains only only double value. >>> var topDistance = intermediateSeeds >>> .mapPartition() >>> .first(discardNumber) >>> .groupBy() >>> .reduceGroup() >>> >>> val newSeeds = intermediateSeeds >>> .map() >>> .groupBy(0) >>> .reduce ().withBroadcastSet(topDistance, "topDistance") >>> .map() >>> >>> seeds = newSeeds >>> } >>> >>> val finalResult = seeds.collect() >>> >>> >>> Thanks, >>> Truong >>> >> >> >