Sure, here some pseudo code:
public class CentroidMover extends GroupReduceFunction<Point, Centroid> { public void reduce(Iterable<Point> points, Collector<Centroid> out) { int cnt = 0; Centroid sum = new Centroid(0,0); for(Point p : points) { sum = sum + p // (your addition logic goes here) cnt++; } out.collect(sum / cnt); // your division logic goes here. } This function computes the sum and the count of a group and the final average. Is this what you are looking for? 2015-05-26 11:34 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > thanks for your message, > > maybe you can give me a exsample for the GroupReduceFunction? > > 2015-05-22 23:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> There are two ways to do that: >> >> 1) You use a GroupReduceFunction, which gives you an iterator over all >> points similar to Hadoop's ReduceFunction. >> 2) You use the ReduceFunction to compute the sum and the count at the >> same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the >> final division. >> >> I'd go with the first choice. It's easier. >> >> Best, Fabian >> >> 2015-05-22 23:09 GMT+02:00 Paul Röwer <paul.roewer1...@googlemail.com>: >> >>> good evening, >>> >>> sorry, my english is not the best. >>> >>> by comupte the new centroid, i will sum all points of the cluster and >>> form the new center. >>> in my other implementation firstly i sum all point and at the end i >>> divides by number of points. >>> to example: (1+2+3+4)/4=2,5 >>> >>> at flink i reduce always two point to one, >>> for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> >>> (2,25+4)=3,125 >>> >>> how can i rewrite my function so, that it work like my other >>> implementation? >>> >>> best regards, >>> paul >>> >>> >>> >>> Am 22.05.2015 um 16:52 schrieb Stephan Ewen: >>> >>> Sorry, I don't understand the question. >>> >>> Can you describe a bit better what you mean with "how i can sum all >>> points and share thoug the counter" ? >>> >>> Thanks! >>> >>> On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1...@googlemail.com> >>> wrote: >>> >>>> i have fix a bug at the input reading, but the results are still >>>> different. >>>> >>>> i think i have local the problem, in the other implementation i sum all >>>> geo points/time points and share thougt the counter. >>>> but in flink i sum two points and share thougt two, and sum the next... >>>> >>>> the method is the following: >>>> >>>> // sums and counts point coordinates >>>> private static final class CentroidAccumulator implements >>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { >>>> >>>> private static final long serialVersionUID = >>>> -4868797820391121771L; >>>> >>>> public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, >>>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { >>>> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, >>>> addAndDiv(val1.f0,val1.f1,val2.f1)); >>>> } >>>> } >>>> >>>> private static GeoTimeDataTupel addAndDiv(int >>>> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){ >>>> long time = (input1.getTime()+input2.getTime())/2; >>>> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); >>>> list.add(input1.getGeo()); >>>> list.add(input2.getGeo()); >>>> LatLongSeriable geo = Geometry.getGeoCenterOf(list); >>>> >>>> return new GeoTimeDataTupel(geo,time,"POINT"); >>>> } >>>> >>>> how i can sum all points and share thoug the counter? >>>> >>>> >>>> 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: >>>> >>>>> hi, >>>>> if i print the centroids all are show in the output. i have implement >>>>> k means with map reduce und spark. by same input, i get the same output. >>>>> but in flink i get a one cluster output with this input set. (i use csv >>>>> files from the GDELT projekt) >>>>> >>>>> here my class: >>>>> >>>>> public class FlinkMain { >>>>> >>>>> >>>>> public static void main(String[] args) { >>>>> //load properties >>>>> Properties pro = new Properties(); >>>>> try { >>>>> pro.load(new >>>>> FileInputStream("./resources/config.properties")); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> int maxIteration = >>>>> 1;//Integer.parseInt(pro.getProperty("maxiterations")); >>>>> String outputPath = pro.getProperty("flink.output"); >>>>> // set up execution environment >>>>> ExecutionEnvironment env = >>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>> // get input points >>>>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >>>>> DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env); >>>>> // set number of bulk iterations for KMeans algorithm >>>>> IterativeDataSet<GeoTimeDataCenter> loop = >>>>> centroids.iterate(maxIteration); >>>>> DataSet<GeoTimeDataCenter> newCentroids = points >>>>> // compute closest centroid for each point >>>>> .map(new SelectNearestCenter()).withBroadcastSet(loop, >>>>> "centroids") >>>>> // count and sum point coordinates for each centroid >>>>> .groupBy(0).reduce(new CentroidAccumulator()) >>>>> // compute new centroids from point counts and coordinate >>>>> sums >>>>> .map(new CentroidAverager()); >>>>> // feed new centroids back into next iteration >>>>> DataSet<GeoTimeDataCenter> finalCentroids = >>>>> loop.closeWith(newCentroids); >>>>> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = >>>>> points >>>>> // assign points to final clusters >>>>> .map(new >>>>> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); >>>>> // emit result >>>>> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); >>>>> finalCentroids.writeAsText(outputPath+"/centers");//print(); >>>>> // execute program >>>>> try { >>>>> env.execute("KMeans Flink"); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> private static final class SelectNearestCenter extends >>>>> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> { >>>>> >>>>> private static final long serialVersionUID = >>>>> -2729445046389350264L; >>>>> private Collection<GeoTimeDataCenter> centroids; >>>>> >>>>> @Override >>>>> public void open(Configuration parameters) throws Exception { >>>>> this.centroids = >>>>> getRuntimeContext().getBroadcastVariable("centroids"); >>>>> } >>>>> >>>>> @Override >>>>> public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel >>>>> point) throws Exception { >>>>> double minDistance = Double.MAX_VALUE; >>>>> int closestCentroidId= -1; >>>>> >>>>> // check all cluster centers >>>>> for(GeoTimeDataCenter centroid : centroids) { >>>>> // compute distance >>>>> double distance = Distance.ComputeDist(point, >>>>> centroid); >>>>> // update nearest cluster if necessary >>>>> if(distance < minDistance) { >>>>> minDistance = distance; >>>>> closestCentroidId = centroid.getId(); >>>>> } >>>>> } >>>>> // emit a new record with the center id and the data point >>>>> return new Tuple2<Integer, >>>>> GeoTimeDataTupel>(closestCentroidId, point); >>>>> } >>>>> } >>>>> >>>>> // sums and counts point coordinates >>>>> private static final class CentroidAccumulator implements >>>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { >>>>> >>>>> private static final long serialVersionUID = >>>>> -4868797820391121771L; >>>>> >>>>> public Tuple2<Integer, GeoTimeDataTupel> >>>>> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, >>>>> GeoTimeDataTupel> val2) { >>>>> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, >>>>> addAndDiv(val1.f1,val2.f1)); >>>>> } >>>>> } >>>>> >>>>> private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, >>>>> GeoTimeDataTupel input2){ >>>>> long time = (input1.getTime()+input2.getTime())/2; >>>>> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); >>>>> list.add(input1.getGeo()); >>>>> list.add(input2.getGeo()); >>>>> LatLongSeriable geo = Geometry.getGeoCenterOf(list); >>>>> >>>>> return new GeoTimeDataTupel(geo,time,"POINT"); >>>>> } >>>>> >>>>> // computes new centroid from coordinate sum and count of points >>>>> private static final class CentroidAverager implements >>>>> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> { >>>>> >>>>> private static final long serialVersionUID = >>>>> -2687234478847261803L; >>>>> >>>>> public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> >>>>> value) { >>>>> return new GeoTimeDataCenter(value.f0, >>>>> value.f1.getGeo(),value.f1.getTime()); >>>>> } >>>>> } >>>>> >>>>> private static DataSet<GeoTimeDataTupel> >>>>> getPointDataSet(ExecutionEnvironment env) { >>>>> // load properties >>>>> Properties pro = new Properties(); >>>>> try { >>>>> pro.load(new >>>>> FileInputStream("./resources/config.properties")); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> String inputFile = pro.getProperty("input"); >>>>> // map csv file >>>>> return env.readCsvFile(inputFile) >>>>> .ignoreInvalidLines() >>>>> .fieldDelimiter('\u0009') >>>>> //.fieldDelimiter("\t") >>>>> //.lineDelimiter("\n") >>>>> .includeFields(true, true, false, false, false, false, >>>>> false, false, false, false, false >>>>> , false, false, false, false, false, false, false, >>>>> false, false, false >>>>> , false, false, false, false, false, false, false, >>>>> false, false, false >>>>> , false, false, false, false, false, false, false, >>>>> false, true, true >>>>> , false, false, false, false, false, false, false, >>>>> false, false, false >>>>> , false, false, false, false, false, false, false, >>>>> false) >>>>> //.includeFields(true,true,true,true) >>>>> .types(String.class, Long.class, Double.class, >>>>> Double.class) >>>>> .map(new TuplePointConverter()); >>>>> } >>>>> >>>>> private static final class TuplePointConverter implements >>>>> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{ >>>>> >>>>> private static final long serialVersionUID = >>>>> 3485560278562719538L; >>>>> >>>>> public GeoTimeDataTupel map(Tuple4<String, Long, Double, >>>>> Double> t) throws Exception { >>>>> return new GeoTimeDataTupel(new LatLongSeriable(t.f2, >>>>> t.f3), t.f1, t.f0); >>>>> } >>>>> } >>>>> >>>>> private static DataSet<GeoTimeDataCenter> >>>>> getCentroidDataSet(ExecutionEnvironment env) { >>>>> // load properties >>>>> Properties pro = new Properties(); >>>>> try { >>>>> pro.load(new >>>>> FileInputStream("./resources/config.properties")); >>>>> } catch (Exception e) { >>>>> e.printStackTrace(); >>>>> } >>>>> String seedFile = pro.getProperty("seed.file"); >>>>> boolean seedFlag = >>>>> Boolean.parseBoolean(pro.getProperty("seed.flag")); >>>>> // get points from file or random >>>>> if(seedFlag || !(new File(seedFile+"-1").exists())) { >>>>> Seeding.randomSeeding(); >>>>> } >>>>> // map csv file >>>>> return env.readCsvFile(seedFile+"-1") >>>>> .lineDelimiter("\n") >>>>> .fieldDelimiter('\u0009') >>>>> //.fieldDelimiter("\t") >>>>> .includeFields(true, true, true, true) >>>>> .types(Integer.class, Double.class, Double.class, >>>>> Long.class) >>>>> .map(new TupleCentroidConverter()); >>>>> } >>>>> >>>>> private static final class TupleCentroidConverter implements >>>>> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{ >>>>> >>>>> private static final long serialVersionUID = >>>>> -1046538744363026794L; >>>>> >>>>> public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, >>>>> Long> t) throws Exception { >>>>> return new GeoTimeDataCenter(t.f0,new >>>>> LatLongSeriable(t.f1, t.f2), t.f3); >>>>> } >>>>> } >>>>> } >>>>> >>>>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>>> >>>>>> Concerning your first problem that you only see one resulting >>>>>> centroid, your code looks good modulo the parts you haven't posted. >>>>>> >>>>>> However, your problem could simply be caused by a bad selection of >>>>>> initial centroids. If, for example, all centroids except for one don't >>>>>> get >>>>>> any points assigned, then only one centroid will survive the iteration >>>>>> step. How do you do it? >>>>>> >>>>>> To check that all centroids are read you can print the contents of >>>>>> the centroids DataSet. Furthermore, you can simply println the new >>>>>> centroids after each iteration step. In local mode you can then observe >>>>>> the >>>>>> computation. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi! >>>>>>> >>>>>>> This problem should not depend on any user code. There are no >>>>>>> user-code dependent actors in Flink. >>>>>>> >>>>>>> Is there more stack trace that you can send us? It looks like it >>>>>>> misses the core exception that is causing the issue is not part of the >>>>>>> stack trace. >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö < >>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>> >>>>>>>> hi flink community, >>>>>>>> >>>>>>>> i have implement k-means for clustering temporal geo data. i use >>>>>>>> the following github project and my own data structure: >>>>>>>> >>>>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java >>>>>>>> >>>>>>>> not i have the problem, that flink read the centroids from file >>>>>>>> and work parallel futher. if i look at the results, i have the feeling, >>>>>>>> that the prgramm load only one centroid point. >>>>>>>> >>>>>>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the >>>>>>>> following exception: >>>>>>>> ERROR actor.OneForOneStrategy: exception during creation >>>>>>>> akka.actor.ActorInitializationException: exception during creation >>>>>>>> at >>>>>>>> akka.actor.ActorInitializationException$.apply(Actor.scala:218) >>>>>>>> at akka.actor.ActorCell.create(ActorCell.scala:578) >>>>>>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >>>>>>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >>>>>>>> at >>>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >>>>>>>> at >>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>>> at >>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>>> at >>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>>> at akka.util.Reflect$.instantiate(Reflect.scala:65) >>>>>>>> at akka.actor.Props.newActor(Props.scala:337) >>>>>>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >>>>>>>> at akka.actor.ActorCell.create(ActorCell.scala:560) >>>>>>>> ... 9 more >>>>>>>> >>>>>>>> how can i say flink, that it should be wait for loading dataset, >>>>>>>> and what say this exception? >>>>>>>> >>>>>>>> best regards, >>>>>>>> paul >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >