Sorry, I don't understand the question. Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?
Thanks! On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1...@googlemail.com> wrote: > i have fix a bug at the input reading, but the results are still different. > > i think i have local the problem, in the other implementation i sum all > geo points/time points and share thougt the counter. > but in flink i sum two points and share thougt two, and sum the next... > > the method is the following: > > // sums and counts point coordinates > private static final class CentroidAccumulator implements > ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { > > private static final long serialVersionUID = -4868797820391121771L; > > public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, > GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { > return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, > addAndDiv(val1.f0,val1.f1,val2.f1)); > } > } > > private static GeoTimeDataTupel addAndDiv(int > clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){ > long time = (input1.getTime()+input2.getTime())/2; > List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); > list.add(input1.getGeo()); > list.add(input2.getGeo()); > LatLongSeriable geo = Geometry.getGeoCenterOf(list); > > return new GeoTimeDataTupel(geo,time,"POINT"); > } > > how i can sum all points and share thoug the counter? > > > 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > >> hi, >> if i print the centroids all are show in the output. i have implement k >> means with map reduce und spark. by same input, i get the same output. but >> in flink i get a one cluster output with this input set. (i use csv files >> from the GDELT projekt) >> >> here my class: >> >> public class FlinkMain { >> >> >> public static void main(String[] args) { >> //load properties >> Properties pro = new Properties(); >> try { >> pro.load(new >> FileInputStream("./resources/config.properties")); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> int maxIteration = >> 1;//Integer.parseInt(pro.getProperty("maxiterations")); >> String outputPath = pro.getProperty("flink.output"); >> // set up execution environment >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> // get input points >> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >> DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env); >> // set number of bulk iterations for KMeans algorithm >> IterativeDataSet<GeoTimeDataCenter> loop = >> centroids.iterate(maxIteration); >> DataSet<GeoTimeDataCenter> newCentroids = points >> // compute closest centroid for each point >> .map(new SelectNearestCenter()).withBroadcastSet(loop, >> "centroids") >> // count and sum point coordinates for each centroid >> .groupBy(0).reduce(new CentroidAccumulator()) >> // compute new centroids from point counts and coordinate sums >> .map(new CentroidAverager()); >> // feed new centroids back into next iteration >> DataSet<GeoTimeDataCenter> finalCentroids = >> loop.closeWith(newCentroids); >> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = >> points >> // assign points to final clusters >> .map(new >> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); >> // emit result >> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); >> finalCentroids.writeAsText(outputPath+"/centers");//print(); >> // execute program >> try { >> env.execute("KMeans Flink"); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> } >> >> private static final class SelectNearestCenter extends >> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> { >> >> private static final long serialVersionUID = >> -2729445046389350264L; >> private Collection<GeoTimeDataCenter> centroids; >> >> @Override >> public void open(Configuration parameters) throws Exception { >> this.centroids = >> getRuntimeContext().getBroadcastVariable("centroids"); >> } >> >> @Override >> public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel >> point) throws Exception { >> double minDistance = Double.MAX_VALUE; >> int closestCentroidId= -1; >> >> // check all cluster centers >> for(GeoTimeDataCenter centroid : centroids) { >> // compute distance >> double distance = Distance.ComputeDist(point, centroid); >> // update nearest cluster if necessary >> if(distance < minDistance) { >> minDistance = distance; >> closestCentroidId = centroid.getId(); >> } >> } >> // emit a new record with the center id and the data point >> return new Tuple2<Integer, >> GeoTimeDataTupel>(closestCentroidId, point); >> } >> } >> >> // sums and counts point coordinates >> private static final class CentroidAccumulator implements >> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { >> >> private static final long serialVersionUID = >> -4868797820391121771L; >> >> public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, >> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { >> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, >> addAndDiv(val1.f1,val2.f1)); >> } >> } >> >> private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, >> GeoTimeDataTupel input2){ >> long time = (input1.getTime()+input2.getTime())/2; >> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); >> list.add(input1.getGeo()); >> list.add(input2.getGeo()); >> LatLongSeriable geo = Geometry.getGeoCenterOf(list); >> >> return new GeoTimeDataTupel(geo,time,"POINT"); >> } >> >> // computes new centroid from coordinate sum and count of points >> private static final class CentroidAverager implements >> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> { >> >> private static final long serialVersionUID = >> -2687234478847261803L; >> >> public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> >> value) { >> return new GeoTimeDataCenter(value.f0, >> value.f1.getGeo(),value.f1.getTime()); >> } >> } >> >> private static DataSet<GeoTimeDataTupel> >> getPointDataSet(ExecutionEnvironment env) { >> // load properties >> Properties pro = new Properties(); >> try { >> pro.load(new >> FileInputStream("./resources/config.properties")); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> String inputFile = pro.getProperty("input"); >> // map csv file >> return env.readCsvFile(inputFile) >> .ignoreInvalidLines() >> .fieldDelimiter('\u0009') >> //.fieldDelimiter("\t") >> //.lineDelimiter("\n") >> .includeFields(true, true, false, false, false, false, false, >> false, false, false, false >> , false, false, false, false, false, false, false, >> false, false, false >> , false, false, false, false, false, false, false, >> false, false, false >> , false, false, false, false, false, false, false, >> false, true, true >> , false, false, false, false, false, false, false, >> false, false, false >> , false, false, false, false, false, false, false, >> false) >> //.includeFields(true,true,true,true) >> .types(String.class, Long.class, Double.class, Double.class) >> .map(new TuplePointConverter()); >> } >> >> private static final class TuplePointConverter implements >> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{ >> >> private static final long serialVersionUID = 3485560278562719538L; >> >> public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> >> t) throws Exception { >> return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), >> t.f1, t.f0); >> } >> } >> >> private static DataSet<GeoTimeDataCenter> >> getCentroidDataSet(ExecutionEnvironment env) { >> // load properties >> Properties pro = new Properties(); >> try { >> pro.load(new >> FileInputStream("./resources/config.properties")); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> String seedFile = pro.getProperty("seed.file"); >> boolean seedFlag = >> Boolean.parseBoolean(pro.getProperty("seed.flag")); >> // get points from file or random >> if(seedFlag || !(new File(seedFile+"-1").exists())) { >> Seeding.randomSeeding(); >> } >> // map csv file >> return env.readCsvFile(seedFile+"-1") >> .lineDelimiter("\n") >> .fieldDelimiter('\u0009') >> //.fieldDelimiter("\t") >> .includeFields(true, true, true, true) >> .types(Integer.class, Double.class, Double.class, Long.class) >> .map(new TupleCentroidConverter()); >> } >> >> private static final class TupleCentroidConverter implements >> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{ >> >> private static final long serialVersionUID = >> -1046538744363026794L; >> >> public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, >> Long> t) throws Exception { >> return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, >> t.f2), t.f3); >> } >> } >> } >> >> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >> >>> Concerning your first problem that you only see one resulting centroid, >>> your code looks good modulo the parts you haven't posted. >>> >>> However, your problem could simply be caused by a bad selection of >>> initial centroids. If, for example, all centroids except for one don't get >>> any points assigned, then only one centroid will survive the iteration >>> step. How do you do it? >>> >>> To check that all centroids are read you can print the contents of the >>> centroids DataSet. Furthermore, you can simply println the new centroids >>> after each iteration step. In local mode you can then observe the >>> computation. >>> >>> Cheers, >>> Till >>> >>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Hi! >>>> >>>> This problem should not depend on any user code. There are no user-code >>>> dependent actors in Flink. >>>> >>>> Is there more stack trace that you can send us? It looks like it misses >>>> the core exception that is causing the issue is not part of the stack >>>> trace. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> >>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <paul.roewer1...@googlemail.com >>>> > wrote: >>>> >>>>> hi flink community, >>>>> >>>>> i have implement k-means for clustering temporal geo data. i use the >>>>> following github project and my own data structure: >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java >>>>> >>>>> not i have the problem, that flink read the centroids from file and >>>>> work parallel futher. if i look at the results, i have the feeling, that >>>>> the prgramm load only one centroid point. >>>>> >>>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the >>>>> following exception: >>>>> ERROR actor.OneForOneStrategy: exception during creation >>>>> akka.actor.ActorInitializationException: exception during creation >>>>> at akka.actor.ActorInitializationException$.apply(Actor.scala:218) >>>>> at akka.actor.ActorCell.create(ActorCell.scala:578) >>>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >>>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >>>>> at >>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >>>>> at >>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>> Method) >>>>> at >>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >>>>> at >>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>> at akka.util.Reflect$.instantiate(Reflect.scala:65) >>>>> at akka.actor.Props.newActor(Props.scala:337) >>>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >>>>> at akka.actor.ActorCell.create(ActorCell.scala:560) >>>>> ... 9 more >>>>> >>>>> how can i say flink, that it should be wait for loading dataset, and >>>>> what say this exception? >>>>> >>>>> best regards, >>>>> paul >>>>> >>>> >>>> >>> >> >