Usually, the while loop solution should perform much worse since it will execute with each new iteration all previous iterations steps without persisting the intermediate results. Thus, it should have a quadratic complexity in terms of iteration step operations instead of a linear complexity. Additionally the while loop will suffer from memory fragmentation because of the explicit DAG unrolling.
I agree with Theo that access to the full code would help a lot to pinpoint the problem. Cheers, Till On Tue, Sep 6, 2016 at 6:50 PM, Theodore Vasiloudis < theodoros.vasilou...@gmail.com> wrote: > Have you tried profiling the application to see where most of the time is > spent during the runs? > > If most of the time is spent reading in the data maybe any difference > between the two methods is being obscured. > > -- > Sent from a mobile device. May contain autocorrect errors. > > On Sep 6, 2016 4:55 PM, "Greg Hogan" <c...@greghogan.com> wrote: > >> Hi Dan, >> >> Flink currently allocates each task slot an equal portion of managed >> memory. I don't know the best way to count task slots. >> https://ci.apache.org/projects/flink/flink-docs-master/ >> concepts/index.html#workers-slots-resources >> >> If you assign TaskManagers less memory then Linux will use the memory to >> cache spill files. >> >> Greg >> >> On Fri, Sep 2, 2016 at 11:30 AM, Dan Drewes <dre...@campus.tu-berlin.de> >> wrote: >> >>> Hi Greg, >>> >>> thanks for your response! >>> >>> I just had a look and realized that it's just about 85 GB of data. Sorry >>> about that wrong information. >>> >>> It's read from a csv file on the master node's local file system. The 8 >>> nodes have more than 40 GB available memory each and since the data is >>> equally distributed I assume there should be no need to spill anything on >>> disk. >>> >>> There are 9 iterations. >>> >>> Is it possible that also with Flink Iterations the data is repeatedly >>> distributed? Or the other way around: Might it be that flink "remembers" >>> somehow that the data is already distributed even for the while loop? >>> >>> -Dan >>> >>> >>> >>> Am 02.09.2016 um 16:39 schrieb Greg Hogan: >>> >>> Hi Dan, >>> >>> Where are you reading the 200 GB "data" from? How much memory per node? >>> If the DataSet is read from a distributed filesystem and if with iterations >>> Flink must spill to disk then I wouldn't expect much difference. About how >>> many iterations are run in the 30 minutes? I don't know that this is >>> reported explicitly, but if your convergence function only has one input >>> record per iteration then the reported total is the iteration count. >>> >>> One other thought, we should soon have support for object reuse with >>> arrays (FLINK-3695). This would be implemented as DoubleValueArray or >>> ValueArray<DoubleValue> rather than double[] but it would be interesting to >>> test for a change in performance. >>> >>> Greg >>> >>> On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes <dre...@campus.tu-berlin.de> >>> wrote: >>> >>>> Hi, >>>> >>>> for my bachelor thesis I'm testing an implementation of L-BFGS >>>> algorithm with Flink Iterations against a version without Flink Iterations >>>> but a casual while loop instead. Both programs use the same Map and Reduce >>>> transformations in each iteration. It was expected, that the performance of >>>> the Flink Iterations would scale better with increasing size of the input >>>> data set. However, the measured results on an ibm-power-cluster are very >>>> similar for both versions, e.g. around 30 minutes for 200 GB data. The >>>> cluster has 8 nodes, was configured with 4 slots per node and I used a >>>> total parallelism of 32. >>>> In every Iteration of the while loop a new flink job is started and I >>>> thought, that also the data would be distributed over the network again in >>>> each iteration which should consume a significant and measurable amount of >>>> time. Is that thought wrong or what is the computional overhead of the >>>> flink iterations that is equalizing this disadvantage? >>>> I include the relevant part of both programs and also attach the >>>> generated execution plans. >>>> Thank you for any ideas as I could not find much about this issue in >>>> the flink docs. >>>> >>>> Best, Dan >>>> >>>> *Flink Iterations:* >>>> >>>> DataSet<double[]> data = ... >>>> >>>> State state = initialState(m, initweights,0,new >>>> double[initweights.length]); >>>> DataSet<State> statedataset = env.fromElements(state); >>>> //start of iteration sectionIterativeDataSet<State> loop= >>>> statedataset.iterate(niter);; >>>> >>>> >>>> DataSet<State> statewithnewlossgradient = >>>> data.map(difffunction).withBroadcastSet(loop, "state") >>>> .reduce(accumulate) >>>> .map(new NormLossGradient(datasize)) >>>> .map(new SetLossGradient()).withBroadcastSet(loop,"state") >>>> .map(new LBFGS()); >>>> >>>> >>>> DataSet<State> converged = statewithnewlossgradient.filter( >>>> new FilterFunction<State>() { >>>> @Override public boolean filter(State value) throws Exception { >>>> if(value.getIflag()[0] == 0){ >>>> return false; >>>> } >>>> return true; >>>> } >>>> } >>>> ); >>>> >>>> DataSet<State> finalstate = >>>> loop.closeWith(statewithnewlossgradient,converged); >>>> >>>> *While loop: * >>>> >>>> DataSet<double[]> data =... >>>> State state = initialState(m, initweights,0,new >>>> double[initweights.length]); >>>> int cnt=0;do{ >>>> LBFGS lbfgs = new LBFGS(); >>>> statedataset=data.map(difffunction).withBroadcastSet(statedataset, >>>> "state") >>>> .reduce(accumulate) >>>> .map(new NormLossGradient(datasize)) >>>> .map(new SetLossGradient()).withBroadcastSet(statedataset,"state") >>>> .map(lbfgs); >>>> cnt++; >>>> }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] != 0); >>>> >>>> >>