good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and form the new center. in my other implementation firstly i sum all point and at the end i divides by number of points.
to example: (1+2+3+4)/4=2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> (2,25+4)=3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul


Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1...@googlemail.com <mailto:paul.roewer1...@googlemail.com>> wrote:

    i have fix a bug at the input reading, but the results are still
    different.

    i think i have local the problem, in the other implementation i
    sum all geo points/time points and share thougt the counter.
    but in flink i sum two points and share thougt two, and sum the
    next...

    the method is the following:

    // sums and counts point coordinates
        private static final class CentroidAccumulator implements
    ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

            private static final long serialVersionUID =
    -4868797820391121771L;

            public Tuple2<Integer, GeoTimeDataTupel>
    reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer,
    GeoTimeDataTupel> val2) {
                return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
    addAndDiv(val1.f0,val1.f1,val2.f1));
            }
        }

        private static GeoTimeDataTupel addAndDiv(int
    clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
            long time = (input1.getTime()+input2.getTime())/2;
            List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
            list.add(input1.getGeo());
            list.add(input2.getGeo());
            LatLongSeriable geo = Geometry.getGeoCenterOf(list);

            return new GeoTimeDataTupel(geo,time,"POINT");
        }

    how i can sum all points and share thoug the counter?


    2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com
    <mailto:paul.roewer1...@googlemail.com>>:

        hi,
        if i print the centroids all are show in the output. i have
        implement k means with map reduce und spark. by same input, i
        get the same output. but in flink i get a one cluster output
        with this input set. (i use csv files from the GDELT projekt)

        here my class:

        public class FlinkMain {


            public static void main(String[] args) {
                //load properties
                Properties pro = new Properties();
                try {
                    pro.load(new
        FileInputStream("./resources/config.properties"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                int maxIteration =
        1;//Integer.parseInt(pro.getProperty("maxiterations"));
                String outputPath = pro.getProperty("flink.output");
                // set up execution environment
                ExecutionEnvironment env =
        ExecutionEnvironment.getExecutionEnvironment();
                // get input points
                DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
                DataSet<GeoTimeDataCenter> centroids =
        getCentroidDataSet(env);
                // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop =
        centroids.iterate(maxIteration);
                DataSet<GeoTimeDataCenter> newCentroids = points
                    // compute closest centroid for each point
                    .map(new
        SelectNearestCenter()).withBroadcastSet(loop, "centroids")
                    // count and sum point coordinates for each centroid
                    .groupBy(0).reduce(new CentroidAccumulator())
                    // compute new centroids from point counts and
        coordinate sums
                    .map(new CentroidAverager());
                // feed new centroids back into next iteration
                DataSet<GeoTimeDataCenter> finalCentroids =
        loop.closeWith(newCentroids);
                DataSet<Tuple2<Integer, GeoTimeDataTupel>>
        clusteredPoints = points
                    // assign points to final clusters
                    .map(new
        SelectNearestCenter()).withBroadcastSet(finalCentroids,
        "centroids");
                // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
                // execute program
                try {
                    env.execute("KMeans Flink");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            private static final class SelectNearestCenter extends
        RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>>
        {

                private static final long serialVersionUID =
        -2729445046389350264L;
                private Collection<GeoTimeDataCenter> centroids;

                @Override
                public void open(Configuration parameters) throws
        Exception {
                    this.centroids =
        getRuntimeContext().getBroadcastVariable("centroids");
                }

                @Override
                public Tuple2<Integer, GeoTimeDataTupel>
        map(GeoTimeDataTupel point) throws Exception {
                    double minDistance = Double.MAX_VALUE;
                    int closestCentroidId= -1;

                    // check all cluster centers
                    for(GeoTimeDataCenter centroid : centroids) {
                        // compute distance
                        double distance = Distance.ComputeDist(point,
        centroid);
                        // update nearest cluster if necessary
                        if(distance < minDistance) {
                            minDistance = distance;
                            closestCentroidId = centroid.getId();
                        }
                    }
                    // emit a new record with the center id and the
        data point
                    return new Tuple2<Integer,
        GeoTimeDataTupel>(closestCentroidId, point);
                }
            }

            // sums and counts point coordinates
            private static final class CentroidAccumulator implements
        ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

                private static final long serialVersionUID =
        -4868797820391121771L;

                public Tuple2<Integer, GeoTimeDataTupel>
        reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer,
        GeoTimeDataTupel> val2) {
                    return new Tuple2<Integer,
        GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
                }
            }

            private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel
        input1, GeoTimeDataTupel input2){
                long time = (input1.getTime()+input2.getTime())/2;
                List<LatLongSeriable> list = new
        ArrayList<LatLongSeriable>();
                list.add(input1.getGeo());
                list.add(input2.getGeo());
                LatLongSeriable geo = Geometry.getGeoCenterOf(list);

                return new GeoTimeDataTupel(geo,time,"POINT");
            }

            // computes new centroid from coordinate sum and count of
        points
            private static final class CentroidAverager implements
        MapFunction<Tuple2<Integer, GeoTimeDataTupel>,
        GeoTimeDataCenter> {

                private static final long serialVersionUID =
        -2687234478847261803L;

                public GeoTimeDataCenter map(Tuple2<Integer,
        GeoTimeDataTupel> value) {
                    return new GeoTimeDataCenter(value.f0,
        value.f1.getGeo(),value.f1.getTime());
                }
            }

            private static DataSet<GeoTimeDataTupel>
        getPointDataSet(ExecutionEnvironment env) {
                // load properties
                Properties pro = new Properties();
                try {
                    pro.load(new
        FileInputStream("./resources/config.properties"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                String inputFile = pro.getProperty("input");
                // map csv file
                return env.readCsvFile(inputFile)
                    .ignoreInvalidLines()
                    .fieldDelimiter('\u0009')
                    //.fieldDelimiter("\t")
                    //.lineDelimiter("\n")
                    .includeFields(true, true, false, false, false,
        false, false, false, false, false, false
                            , false, false, false, false, false,
        false, false, false, false, false
                            , false, false, false, false, false,
        false, false, false, false, false
                            , false, false, false, false, false,
        false, false, false, true, true
                            , false, false, false, false, false,
        false, false, false, false, false
                            , false, false, false, false, false,
        false, false, false)
        //.includeFields(true,true,true,true)
                    .types(String.class, Long.class, Double.class,
        Double.class)
                    .map(new TuplePointConverter());
            }

            private static final class TuplePointConverter implements
        MapFunction<Tuple4<String, Long, Double, Double>,
        GeoTimeDataTupel>{

                private static final long serialVersionUID =
        3485560278562719538L;

                public GeoTimeDataTupel map(Tuple4<String, Long,
        Double, Double> t) throws Exception {
                    return new GeoTimeDataTupel(new
        LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
                }
            }

            private static DataSet<GeoTimeDataCenter>
        getCentroidDataSet(ExecutionEnvironment env) {
                // load properties
                Properties pro = new Properties();
                try {
                    pro.load(new
        FileInputStream("./resources/config.properties"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                String seedFile = pro.getProperty("seed.file");
                boolean seedFlag =
        Boolean.parseBoolean(pro.getProperty("seed.flag"));
                // get points from file or random
                if(seedFlag || !(new File(seedFile+"-1").exists())) {
                    Seeding.randomSeeding();
                }
                // map csv file
                return env.readCsvFile(seedFile+"-1")
                    .lineDelimiter("\n")
                    .fieldDelimiter('\u0009')
                    //.fieldDelimiter("\t")
                    .includeFields(true, true, true, true)
                    .types(Integer.class, Double.class, Double.class,
        Long.class)
                    .map(new TupleCentroidConverter());
            }

            private static final class TupleCentroidConverter
        implements MapFunction<Tuple4<Integer, Double, Double, Long>,
        GeoTimeDataCenter>{

                private static final long serialVersionUID =
        -1046538744363026794L;

                public GeoTimeDataCenter map(Tuple4<Integer, Double,
        Double, Long> t) throws Exception {
                    return new GeoTimeDataCenter(t.f0,new
        LatLongSeriable(t.f1, t.f2), t.f3);
                }
            }
        }

        2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org
        <mailto:trohrm...@apache.org>>:

            Concerning your first problem that you only see one
            resulting centroid, your code looks good modulo the parts
            you haven't posted.

            However, your problem could simply be caused by a bad
            selection of initial centroids. If, for example, all
            centroids except for one don't get any points assigned,
            then only one centroid will survive the iteration step.
            How do you do it?

            To check that all centroids are read you can print the
            contents of the centroids DataSet. Furthermore, you can
            simply println the new centroids after each iteration
            step. In local mode you can then observe the computation.

            Cheers,
            Till

            On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen
            <se...@apache.org <mailto:se...@apache.org>> wrote:

                Hi!

                This problem should not depend on any user code. There
                are no user-code dependent actors in Flink.

                Is there more stack trace that you can send us? It
                looks like it misses the core exception that is
                causing the issue is not part of the stack trace.

                Greetings,
                Stephan



                On Thu, May 21, 2015 at 11:11 AM, Pa Rö
                <paul.roewer1...@googlemail.com
                <mailto:paul.roewer1...@googlemail.com>> wrote:

                    hi flink community,

                    i have implement k-means for clustering temporal
                    geo data. i use the following github project and
                    my own data structure:
                    
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

                    not i have the problem, that flink read the
                    centroids from file and work parallel futher. if i
                    look at the results, i have the feeling, that the
                    prgramm load only one centroid point.

                    i work with flink 0.8.1, if i update to 0.9
                    milestone 1 i get the following exception:
                    ERROR actor.OneForOneStrategy: exception during
                    creation
                    akka.actor.ActorInitializationException: exception
                    during creation
                        at
                    
akka.actor.ActorInitializationException$.apply(Actor.scala:218)
                        at
                    akka.actor.ActorCell.create(ActorCell.scala:578)
                        at
                    akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
                        at
                    akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
                        at
                    
akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
                        at akka.dispatch.Mailbox.run(Mailbox.scala:218)
                        at
                    
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
                        at
                    
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                        at
                    
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                        at
                    
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                        at
                    
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
                    Caused by: java.lang.reflect.InvocationTargetException
                        at
                    
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
                    Method)
                        at
                    
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
                        at
                    
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
                        at
                    
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
                        at
                    akka.util.Reflect$.instantiate(Reflect.scala:65)
                        at akka.actor.Props.newActor(Props.scala:337)
                        at
                    akka.actor.ActorCell.newActor(ActorCell.scala:534)
                        at
                    akka.actor.ActorCell.create(ActorCell.scala:560)
                        ... 9 more

                    how can i say flink, that it should be wait for
                    loading dataset, and what say this exception?

                    best regards,
                    paul







Reply via email to