Hi Truong,

I guess the problem is that you want to use topDistance as a broadcast set
inside the iteration? If I understand correctly this is a dataset with a
single value, right? Could you maybe compute it with an aggregator instead?

-Vasia.

On 5 July 2016 at 21:48, Nguyen Xuan Truong <truongn...@gmail.com> wrote:

> Hi Vasia,
>
> Thank you very much for your explanation :). When running with small
> maxIteration, the job graph that Flink executed was optimal. However, when
> maxIterations was large, Flink took very long time to generate the job
> graph. The actually time to execute the jobs was very fast but the time to
> optimize and schedule the jobs was slow.
>
> Regarding your suggestion, I didn't use iterate/iterateDelta because I
> need to access the intermediate results within an iteration (the
> topDistance in my pseudo-code). As you said before, Flink does not support
> that feature, so I wondered if you have a workround for interate or
> iterateDelta?
>
> Thanks,
> Truong
>
> On Tue, Jul 5, 2016 at 8:46 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hi Truong,
>>
>> I'm afraid what you're experiencing is to be expected. Currently, for
>> loops do not perform well in Flink since there is no support for caching
>> intermediate results yet. This has been a quite often requested feature
>> lately, so maybe it will be added soon :)
>> Until then, I suggest you try implementing your logic using iterate or
>> iterateDelta.
>>
>> Cheers,
>> -Vasia.
>>
>> On 5 July 2016 at 17:11, Nguyen Xuan Truong <truongn...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a Flink program which is similar to Kmeans algorithm. I use
>>> normal iteration(for loop) because Flink iteration does not allow to
>>> compute the intermediate results(in this case the topDistance) within one
>>> iteration. The problem is that my program only runs when maxIteration is
>>> small. When the maxIterations is big, Flink jobs inside the forloop are not
>>> scheduled, deployed or executed. The program hangs forever without any
>>> exception, error or log message.
>>>
>>> I ran the program on both local and cluster environments, having the
>>> same issue. I tried with smaller inputs (points and seeds), having the same
>>> issue.
>>>
>>> Does anybody have an idea about what is the problem? (Maybe the forloop
>>> creates many Flink jobs?)
>>>
>>> Here is the pseudo-code of my program:
>>>
>>> DataSet[Point] points = env.readTextFile(inputPoints)
>>> DataSet[Point] seeds = env.readTextFile(inputSeeds)
>>> discardNumber: Int = 100
>>> maxIterations: Int = 20 // maxIteration = 30 will hang the program and
>>> no Flink job inside the forloop jobs is deployed)
>>>
>>> for(iteration <- 1 to maxIterations) {
>>>
>>>       val intermediateSeeds = points
>>>         .map()
>>>         .withBroadcastSet(seeds, "seeds")
>>>
>>>      //topDistance contains only only double value.
>>>       var topDistance = intermediateSeeds
>>>         .mapPartition()
>>>         .first(discardNumber)
>>>         .groupBy()
>>>         .reduceGroup()
>>>
>>>       val newSeeds = intermediateSeeds
>>>         .map()
>>>         .groupBy(0)
>>>         .reduce ().withBroadcastSet(topDistance, "topDistance")
>>>         .map()
>>>
>>>       seeds = newSeeds
>>> }
>>>
>>> val finalResult = seeds.collect()
>>>
>>>
>>> Thanks,
>>> Truong
>>>
>>
>>
>

Reply via email to