Hi,

I am not broadcasting the data but the model, i.e. the weight vector
contained in the "State".

You are right, it would be better for the implementation with the while
loop to have the data on HDFS. But that's exactly the point of my
question: Why are the Flink Iterations not faster if you don't have the
data directly available to the workers by HDFS?

-Dan


Am 05.09.2016 um 16:10 schrieb Theodore Vasiloudis:
Hello Dan,

are you broadcasting the 85GB of data then? I don't get why you
wouldn't store that file on HDFS so it's accessible by your workers.


If you have the full code available somewhere we might be able to help
better.

For L-BFGS you should only be broadcasting the model (i.e. the weight
vector), and yes that would happen at each iteration, since you are
updating the model at each iteration.

On Fri, Sep 2, 2016 at 5:30 PM, Dan Drewes <dre...@campus.tu-berlin.de
<mailto:dre...@campus.tu-berlin.de>> wrote:

    Hi Greg,

    thanks for your response!

    I just had a look and realized that it's just about 85 GB of data.
    Sorry about that wrong information.

    It's read from a csv file on the master node's local file system.
    The 8 nodes have more than 40 GB available memory each and since
    the data is equally distributed I assume there should be no need
    to spill anything on disk.

    There are 9 iterations.

    Is it possible that also with Flink Iterations the data is
    repeatedly distributed? Or the other way around: Might it be that
    flink "remembers" somehow that the data is already distributed
    even for the while loop?

    -Dan



    Am 02.09.2016 um 16:39 schrieb Greg Hogan:
    Hi Dan,

    Where are you reading the 200 GB "data" from? How much memory per
    node? If the DataSet is read from a distributed filesystem and if
    with iterations Flink must spill to disk then I wouldn't expect
    much difference. About how many iterations are run in the 30
    minutes? I don't know that this is reported explicitly, but if
    your convergence function only has one input record per iteration
    then the reported total is the iteration count.

    One other thought, we should soon have support for object reuse
    with arrays (FLINK-3695). This would be implemented as
    DoubleValueArray or ValueArray<DoubleValue> rather than double[]
    but it would be interesting to test for a change in performance.

    Greg

    On Fri, Sep 2, 2016 at 6:16 AM, Dan Drewes
    <dre...@campus.tu-berlin.de <mailto:dre...@campus.tu-berlin.de>>
    wrote:

        Hi,

        for my bachelor thesis I'm testing an implementation of
        L-BFGS algorithm with Flink Iterations against a version
        without Flink Iterations but a casual while loop instead.
        Both programs use the same Map and Reduce transformations in
        each iteration. It was expected, that the performance of the
        Flink Iterations would scale better with increasing size of
        the input data set. However, the measured results on an
        ibm-power-cluster are very similar for both versions, e.g.
        around 30 minutes for 200 GB data. The cluster has 8 nodes,
        was configured with 4 slots per node and I used a total
        parallelism of 32.
        In every Iteration of the while loop a new flink job is
        started and I thought, that also the data would be
        distributed over the network again in each iteration which
        should consume a significant and measurable amount of time.
        Is that thought wrong or what is the computional overhead of
        the flink iterations that is equalizing this disadvantage?
        I include the relevant part of both programs and also attach
        the generated execution plans.
        Thank you for any ideas as I could not find much about this
        issue in the flink docs.

        Best, Dan

        *Flink Iterations:*

        DataSet<double[]> data = ...

        State  state =initialState(m, initweights,0,new 
double[initweights.length]);
        DataSet<State> statedataset = env.fromElements(state);

        //start of iteration section IterativeDataSet<State> loop= 
statedataset.iterate(niter);;


        DataSet<State> statewithnewlossgradient = 
data.map(difffunction).withBroadcastSet(loop,"state")
                       .reduce(accumulate)
                       .map(new NormLossGradient(datasize))
                       .map(new 
SetLossGradient()).withBroadcastSet(loop,"state")
                       .map(new LBFGS());


        DataSet<State> converged = statewithnewlossgradient.filter(
            new FilterFunction<State>() {
               @Override public boolean filter(State  value)throws Exception {
                  if(value.getIflag()[0] ==0){
                     return false;
                  }
                  return true;
               }
            }
        );

        DataSet<State> finalstate = 
loop.closeWith(statewithnewlossgradient,converged);

        ***While loop: *

        DataSet<double[]> data =... State  state =initialState(m, 
initweights,0,new double[initweights.length]);

        int cnt=0;
        do{
            LBFGS lbfgs =new LBFGS();
            
statedataset=data.map(difffunction).withBroadcastSet(statedataset,"state")
               .reduce(accumulate)
               .map(new NormLossGradient(datasize))
               .map(new 
SetLossGradient()).withBroadcastSet(statedataset,"state")
               .map(lbfgs);
            cnt++;
        }while (cnt<niter && statedataset.collect().get(0).getIflag()[0] !=0);



---
Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
https://www.avast.com/antivirus

Reply via email to