Usually, the while loop solution should perform much worse since it will
execute with each new iteration all previous iterations steps without
persisting the intermediate results. Thus, it should have a quadratic
complexity in terms of iteration step operations instead of a linear
complexity. Additionally the while loop will suffer from memory
fragmentation because of the explicit DAG unrolling.

I agree with Theo that access to the full code would help a lot to pinpoint
the problem.

Cheers,
Till

On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Have you tried profiling the application to see where most of the time is
> spent during the runs?
>
> If most of the time is spent reading in the data maybe any difference
> between the two methods is being obscured.
>
> --
> Sent from a mobile device. May contain autocorrect errors.
>
> On Sep 6, 2016 4:55 PM, "Greg Hogan" <c...@greghogan.com> wrote:
>
>> Hi Dan,
>>
>> Flink currently allocates each task slot an equal portion of managed
>> memory. I don't know the best way to count task slots.
>>   https://ci.apache.org/projects/flink/flink-docs-master/
>> concepts/index.html#workers-slots-resources
>>
>> If you assign TaskManagers less memory then Linux will use the memory to
>> cache spill files.
>>
>> Greg
>>
>> On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dre...@campus.tu-berlin.de>
>> wrote:
>>
>>> Hi Greg,
>>>
>>> thanks for your response!
>>>
>>> I just had a look and realized that it's just about 85 GB of data. Sorry
>>> about that wrong information.
>>>
>>> It's read from a csv file on the master node's local file system. The 8
>>> nodes have more than 40 GB available memory each and since the data is
>>> equally distributed I assume there should be no need to spill anything on
>>> disk.
>>>
>>> There are 9 iterations.
>>>
>>> Is it possible that also with Flink Iterations the data is repeatedly
>>> distributed? Or the other way around: Might it be that flink "remembers"
>>> somehow that the data is already distributed even for the while loop?
>>>
>>> -Dan
>>>
>>>
>>>
>>> Am 02.09.2016 um 16:39 schrieb Greg Hogan:
>>>
>>> Hi Dan,
>>>
>>> Where are you reading the 200 GB "data" from? How much memory per node?
>>> If the DataSet is read from a distributed filesystem and if with iterations
>>> Flink must spill to disk then I wouldn't expect much difference. About how
>>> many iterations are run in the 30 minutes? I don't know that this is
>>> reported explicitly, but if your convergence function only has one input
>>> record per iteration then the reported total is the iteration count.
>>>
>>> One other thought, we should soon have support for object reuse with
>>> arrays (FLINK-3695). This would be implemented as DoubleValueArray or
>>> ValueArray<DoubleValue> rather than double[] but it would be interesting to
>>> test for a change in performance.
>>>
>>> Greg
>>>
>>> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dre...@campus.tu-berlin.de>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> for my bachelor thesis I'm testing an implementation of L-BFGS
>>>> algorithm with Flink Iterations against a version without Flink Iterations
>>>> but a casual while loop instead. Both programs use the same Map and Reduce
>>>> transformations in each iteration. It was expected, that the performance of
>>>> the Flink Iterations would scale better with increasing size of the input
>>>> data set. However, the measured results on an ibm-power-cluster are very
>>>> similar for both versions, e.g. around 30 minutes for 200 GB data. The
>>>> cluster has 8 nodes, was configured with 4 slots per node and I used a
>>>> total parallelism of 32.
>>>> In every Iteration of the while loop a new flink job is started and I
>>>> thought, that also the data would be distributed over the network again in
>>>> each iteration which should consume a significant and measurable amount of
>>>> time. Is that thought wrong or what is the computional overhead of the
>>>> flink iterations that is equalizing this disadvantage?
>>>> I include the relevant part of both programs and also attach the
>>>> generated execution plans.
>>>> Thank you for any ideas as I could not find much about this issue in
>>>> the flink docs.
>>>>
>>>> Best, Dan
>>>>
>>>> *Flink Iterations:*
>>>>
>>>> DataSet<double[]> data = ...
>>>>
>>>> State state = initialState(m, initweights,0,new 
>>>> double[initweights.length]);
>>>> DataSet<State> statedataset = env.fromElements(state);
>>>> //start of iteration sectionIterativeDataSet<State> loop= 
>>>> statedataset.iterate(niter);;
>>>>
>>>>
>>>> DataSet<State> statewithnewlossgradient = 
>>>> data.map(difffunction).withBroadcastSet(loop, "state")
>>>>               .reduce(accumulate)
>>>>               .map(new NormLossGradient(datasize))
>>>>               .map(new SetLossGradient()).withBroadcastSet(loop,"state")
>>>>               .map(new LBFGS());
>>>>
>>>>
>>>> DataSet<State> converged = statewithnewlossgradient.filter(
>>>>    new FilterFunction<State>() {
>>>>       @Override      public boolean filter(State value) throws Exception {
>>>>          if(value.getIflag()[0] == 0){
>>>>             return false;
>>>>          }
>>>>          return true;
>>>>       }
>>>>    }
>>>> );
>>>>
>>>> DataSet<State> finalstate = 
>>>> loop.closeWith(statewithnewlossgradient,converged);
>>>>
>>>> *While loop: *
>>>>
>>>> DataSet<double[]> data =...
>>>> State state = initialState(m, initweights,0,new 
>>>> double[initweights.length]);
>>>> int cnt=0;do{
>>>>    LBFGS lbfgs = new LBFGS();
>>>>    statedataset=data.map(difffunction).withBroadcastSet(statedataset, 
>>>> "state")
>>>>       .reduce(accumulate)
>>>>       .map(new NormLossGradient(datasize))
>>>>       .map(new SetLossGradient()).withBroadcastSet(statedataset,"state")
>>>>       .map(lbfgs);
>>>>    cnt++;
>>>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0);
>>>>
>>>>
>>

Reply via email to