Hi Vasia, You are right about the topDistance, it is the dataset which has only 1 double value. I already looked at the Aggregator and I can only get the value of an aggregator in the next iteration. However, my problem is a bit tricky because the topDistance controls how the newSeeds is calculated. I managed to speed up the normal iteration a bit but still curious if there is any workround to use the native flink iteration?
Thanks, Truong On Thu, Jul 7, 2016 at 10:17 AM, Vasiliki Kalavri <vasilikikala...@gmail.com > wrote: > Hi Truong, > > I guess the problem is that you want to use topDistance as a broadcast set > inside the iteration? If I understand correctly this is a dataset with a > single value, right? Could you maybe compute it with an aggregator instead? > > -Vasia. > > On 5 July 2016 at 21:48, Nguyen Xuan Truong <truongn...@gmail.com> wrote: > >> Hi Vasia, >> >> Thank you very much for your explanation :). When running with small >> maxIteration, the job graph that Flink executed was optimal. However, when >> maxIterations was large, Flink took very long time to generate the job >> graph. The actually time to execute the jobs was very fast but the time to >> optimize and schedule the jobs was slow. >> >> Regarding your suggestion, I didn't use iterate/iterateDelta because I >> need to access the intermediate results within an iteration (the >> topDistance in my pseudo-code). As you said before, Flink does not support >> that feature, so I wondered if you have a workround for interate or >> iterateDelta? >> >> Thanks, >> Truong >> >> On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri < >> vasilikikala...@gmail.com> wrote: >> >>> Hi Truong, >>> >>> I'm afraid what you're experiencing is to be expected. Currently, for >>> loops do not perform well in Flink since there is no support for caching >>> intermediate results yet. This has been a quite often requested feature >>> lately, so maybe it will be added soon :) >>> Until then, I suggest you try implementing your logic using iterate or >>> iterateDelta. >>> >>> Cheers, >>> -Vasia. >>> >>> On 5 July 2016 at 17:11, Nguyen Xuan Truong <truongn...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I have a Flink program which is similar to Kmeans algorithm. I use >>>> normal iteration(for loop) because Flink iteration does not allow to >>>> compute the intermediate results(in this case the topDistance) within one >>>> iteration. The problem is that my program only runs when maxIteration is >>>> small. When the maxIterations is big, Flink jobs inside the forloop are not >>>> scheduled, deployed or executed. The program hangs forever without any >>>> exception, error or log message. >>>> >>>> I ran the program on both local and cluster environments, having the >>>> same issue. I tried with smaller inputs (points and seeds), having the same >>>> issue. >>>> >>>> Does anybody have an idea about what is the problem? (Maybe the forloop >>>> creates many Flink jobs?) >>>> >>>> Here is the pseudo-code of my program: >>>> >>>> DataSet[Point] points = env.readTextFile(inputPoints) >>>> DataSet[Point] seeds = env.readTextFile(inputSeeds) >>>> discardNumber: Int = 100 >>>> maxIterations: Int = 20 // maxIteration = 30 will hang the program and >>>> no Flink job inside the forloop jobs is deployed) >>>> >>>> for(iteration <- 1 to maxIterations) { >>>> >>>> val intermediateSeeds = points >>>> .map() >>>> .withBroadcastSet(seeds, "seeds") >>>> >>>> //topDistance contains only only double value. >>>> var topDistance = intermediateSeeds >>>> .mapPartition() >>>> .first(discardNumber) >>>> .groupBy() >>>> .reduceGroup() >>>> >>>> val newSeeds = intermediateSeeds >>>> .map() >>>> .groupBy(0) >>>> .reduce ().withBroadcastSet(topDistance, "topDistance") >>>> .map() >>>> >>>> seeds = newSeeds >>>> } >>>> >>>> val finalResult = seeds.collect() >>>> >>>> >>>> Thanks, >>>> Truong >>>> >>> >>> >> >