Hi Truong,

I'm afraid what you're experiencing is to be expected. Currently, for loops
do not perform well in Flink since there is no support for caching
intermediate results yet. This has been a quite often requested feature
lately, so maybe it will be added soon :)
Until then, I suggest you try implementing your logic using iterate or
iterateDelta.

Cheers,
-Vasia.

On 5 July 2016 at 17:11, Nguyen Xuan Truong <truongn...@gmail.com> wrote:

> Hi,
>
> I have a Flink program which is similar to Kmeans algorithm. I use normal
> iteration(for loop) because Flink iteration does not allow to compute the
> intermediate results(in this case the topDistance) within one iteration.
> The problem is that my program only runs when maxIteration is small. When
> the maxIterations is big, Flink jobs inside the forloop are not scheduled,
> deployed or executed. The program hangs forever without any exception,
> error or log message.
>
> I ran the program on both local and cluster environments, having the same
> issue. I tried with smaller inputs (points and seeds), having the same
> issue.
>
> Does anybody have an idea about what is the problem? (Maybe the forloop
> creates many Flink jobs?)
>
> Here is the pseudo-code of my program:
>
> DataSet[Point] points = env.readTextFile(inputPoints)
> DataSet[Point] seeds = env.readTextFile(inputSeeds)
> discardNumber: Int = 100
> maxIterations: Int = 20 // maxIteration = 30 will hang the program and no
> Flink job inside the forloop jobs is deployed)
>
> for(iteration <- 1 to maxIterations) {
>
>       val intermediateSeeds = points
>         .map()
>         .withBroadcastSet(seeds, "seeds")
>
>      //topDistance contains only only double value.
>       var topDistance = intermediateSeeds
>         .mapPartition()
>         .first(discardNumber)
>         .groupBy()
>         .reduceGroup()
>
>       val newSeeds = intermediateSeeds
>         .map()
>         .groupBy(0)
>         .reduce ().withBroadcastSet(topDistance, "topDistance")
>         .map()
>
>       seeds = newSeeds
> }
>
> val finalResult = seeds.collect()
>
>
> Thanks,
> Truong
>

Reply via email to