Sure,

here some pseudo code:

public class CentroidMover extends GroupReduceFunction<Point, Centroid> {

  public void reduce(Iterable<Point> points, Collector<Centroid> out) {
    int cnt = 0;
    Centroid sum = new Centroid(0,0);
    for(Point p : points) {
      sum = sum + p // (your addition logic goes here)
      cnt++;
    }
    out.collect(sum / cnt); // your division logic goes here.
}

This function computes the sum and the count of a group and the final
average.

Is this what you are looking for?

2015-05-26 11:34 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>:

> thanks for your message,
>
> maybe you can give me a exsample for the GroupReduceFunction?
>
> 2015-05-22 23:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> There are two ways to do that:
>>
>> 1) You use a GroupReduceFunction, which gives you an iterator over all
>> points similar to Hadoop's ReduceFunction.
>> 2) You use the ReduceFunction to compute the sum and the count at the
>> same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the
>> final division.
>>
>> I'd go with the first choice. It's easier.
>>
>> Best, Fabian
>>
>> 2015-05-22 23:09 GMT+02:00 Paul Röwer <paul.roewer1...@googlemail.com>:
>>
>>>  good evening,
>>>
>>> sorry, my english is not the best.
>>>
>>> by comupte the new centroid, i will sum all points of the cluster and
>>> form the new center.
>>> in my other implementation firstly i sum all point and at the end i
>>> divides by number of points.
>>> to example: (1+2+3+4)/4=2,5
>>>
>>> at flink i reduce always two point to one,
>>> for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 -->
>>> (2,25+4)=3,125
>>>
>>> how can i rewrite my function so, that it work like my other
>>> implementation?
>>>
>>> best regards,
>>> paul
>>>
>>>
>>>
>>> Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
>>>
>>> Sorry, I don't understand the question.
>>>
>>>  Can you describe a bit better what you mean with "how i can sum all
>>> points and share thoug the counter" ?
>>>
>>>  Thanks!
>>>
>>> On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1...@googlemail.com>
>>> wrote:
>>>
>>>>   i have fix a bug at the input reading, but the results are still
>>>> different.
>>>>
>>>> i think i have local the problem, in the other implementation i sum all
>>>> geo points/time points and share thougt the counter.
>>>>  but in flink i sum two points and share thougt two, and sum the next...
>>>>
>>>>  the method is the following:
>>>>
>>>> // sums and counts point coordinates
>>>>     private static final class CentroidAccumulator implements
>>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>>>
>>>>         private static final long serialVersionUID =
>>>> -4868797820391121771L;
>>>>
>>>>         public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
>>>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>>>>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>>> addAndDiv(val1.f0,val1.f1,val2.f1));
>>>>         }
>>>>     }
>>>>
>>>>     private static GeoTimeDataTupel addAndDiv(int
>>>> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
>>>>         long time = (input1.getTime()+input2.getTime())/2;
>>>>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>>>         list.add(input1.getGeo());
>>>>         list.add(input2.getGeo());
>>>>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>>>
>>>>         return new GeoTimeDataTupel(geo,time,"POINT");
>>>>     }
>>>>
>>>>  how i can sum all points and share thoug the counter?
>>>>
>>>>
>>>> 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>:
>>>>
>>>>>  hi,
>>>>>  if i print the centroids all are show in the output. i have implement
>>>>> k means with map reduce und spark. by same input, i get the same output.
>>>>> but in flink i get a one cluster output with this input set. (i use csv
>>>>> files from the GDELT projekt)
>>>>>
>>>>>  here my class:
>>>>>
>>>>> public class FlinkMain {
>>>>>
>>>>>
>>>>>     public static void main(String[] args) {
>>>>>         //load properties
>>>>>         Properties pro = new Properties();
>>>>>         try {
>>>>>             pro.load(new
>>>>> FileInputStream("./resources/config.properties"));
>>>>>         } catch (Exception e) {
>>>>>             e.printStackTrace();
>>>>>         }
>>>>>         int maxIteration =
>>>>> 1;//Integer.parseInt(pro.getProperty("maxiterations"));
>>>>>         String outputPath = pro.getProperty("flink.output");
>>>>>         // set up execution environment
>>>>>         ExecutionEnvironment env =
>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>         // get input points
>>>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>>>         DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
>>>>>         // set number of bulk iterations for KMeans algorithm
>>>>>         IterativeDataSet<GeoTimeDataCenter> loop =
>>>>> centroids.iterate(maxIteration);
>>>>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>>>>             // compute closest centroid for each point
>>>>>             .map(new SelectNearestCenter()).withBroadcastSet(loop,
>>>>> "centroids")
>>>>>             // count and sum point coordinates for each centroid
>>>>>             .groupBy(0).reduce(new CentroidAccumulator())
>>>>>             // compute new centroids from point counts and coordinate
>>>>> sums
>>>>>             .map(new CentroidAverager());
>>>>>         // feed new centroids back into next iteration
>>>>>         DataSet<GeoTimeDataCenter> finalCentroids =
>>>>> loop.closeWith(newCentroids);
>>>>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints =
>>>>> points
>>>>>             // assign points to final clusters
>>>>>             .map(new
>>>>> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
>>>>>         // emit result
>>>>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>>>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>>>>         // execute program
>>>>>         try {
>>>>>             env.execute("KMeans Flink");
>>>>>         } catch (Exception e) {
>>>>>             e.printStackTrace();
>>>>>         }
>>>>>     }
>>>>>
>>>>>      private static final class SelectNearestCenter extends
>>>>> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
>>>>>
>>>>>         private static final long serialVersionUID =
>>>>> -2729445046389350264L;
>>>>>         private Collection<GeoTimeDataCenter> centroids;
>>>>>
>>>>>         @Override
>>>>>         public void open(Configuration parameters) throws Exception {
>>>>>             this.centroids =
>>>>> getRuntimeContext().getBroadcastVariable("centroids");
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel
>>>>> point) throws Exception {
>>>>>             double minDistance = Double.MAX_VALUE;
>>>>>             int closestCentroidId= -1;
>>>>>
>>>>>             // check all cluster centers
>>>>>             for(GeoTimeDataCenter centroid : centroids) {
>>>>>                 // compute distance
>>>>>                 double distance = Distance.ComputeDist(point,
>>>>> centroid);
>>>>>                 // update nearest cluster if necessary
>>>>>                 if(distance < minDistance) {
>>>>>                     minDistance = distance;
>>>>>                     closestCentroidId = centroid.getId();
>>>>>                 }
>>>>>             }
>>>>>             // emit a new record with the center id and the data point
>>>>>             return new Tuple2<Integer,
>>>>> GeoTimeDataTupel>(closestCentroidId, point);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     // sums and counts point coordinates
>>>>>     private static final class CentroidAccumulator implements
>>>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>>>>
>>>>>         private static final long serialVersionUID =
>>>>> -4868797820391121771L;
>>>>>
>>>>>         public Tuple2<Integer, GeoTimeDataTupel>
>>>>> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer,
>>>>> GeoTimeDataTupel> val2) {
>>>>>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>>>> addAndDiv(val1.f1,val2.f1));
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1,
>>>>> GeoTimeDataTupel input2){
>>>>>         long time = (input1.getTime()+input2.getTime())/2;
>>>>>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>>>>         list.add(input1.getGeo());
>>>>>         list.add(input2.getGeo());
>>>>>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>>>>
>>>>>         return new GeoTimeDataTupel(geo,time,"POINT");
>>>>>     }
>>>>>
>>>>>     // computes new centroid from coordinate sum and count of points
>>>>>     private static final class CentroidAverager implements
>>>>> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {
>>>>>
>>>>>         private static final long serialVersionUID =
>>>>> -2687234478847261803L;
>>>>>
>>>>>         public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel>
>>>>> value) {
>>>>>             return new GeoTimeDataCenter(value.f0,
>>>>> value.f1.getGeo(),value.f1.getTime());
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private static DataSet<GeoTimeDataTupel>
>>>>> getPointDataSet(ExecutionEnvironment env) {
>>>>>         // load properties
>>>>>         Properties pro = new Properties();
>>>>>         try {
>>>>>             pro.load(new
>>>>> FileInputStream("./resources/config.properties"));
>>>>>         } catch (Exception e) {
>>>>>             e.printStackTrace();
>>>>>         }
>>>>>         String inputFile = pro.getProperty("input");
>>>>>         // map csv file
>>>>>         return env.readCsvFile(inputFile)
>>>>>             .ignoreInvalidLines()
>>>>>             .fieldDelimiter('\u0009')
>>>>>             //.fieldDelimiter("\t")
>>>>>             //.lineDelimiter("\n")
>>>>>             .includeFields(true, true, false, false, false, false,
>>>>> false, false, false, false, false
>>>>>                     , false, false, false, false, false, false, false,
>>>>> false, false, false
>>>>>                     , false, false, false, false, false, false, false,
>>>>> false, false, false
>>>>>                     , false, false, false, false, false, false, false,
>>>>> false, true, true
>>>>>                     , false, false, false, false, false, false, false,
>>>>> false, false, false
>>>>>                     , false, false, false, false, false, false, false,
>>>>> false)
>>>>>             //.includeFields(true,true,true,true)
>>>>>             .types(String.class, Long.class, Double.class,
>>>>> Double.class)
>>>>>             .map(new TuplePointConverter());
>>>>>     }
>>>>>
>>>>>     private static final class TuplePointConverter implements
>>>>> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
>>>>>
>>>>>         private static final long serialVersionUID =
>>>>> 3485560278562719538L;
>>>>>
>>>>>         public GeoTimeDataTupel map(Tuple4<String, Long, Double,
>>>>> Double> t) throws Exception {
>>>>>             return new GeoTimeDataTupel(new LatLongSeriable(t.f2,
>>>>> t.f3), t.f1, t.f0);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private static DataSet<GeoTimeDataCenter>
>>>>> getCentroidDataSet(ExecutionEnvironment env) {
>>>>>         // load properties
>>>>>         Properties pro = new Properties();
>>>>>         try {
>>>>>             pro.load(new
>>>>> FileInputStream("./resources/config.properties"));
>>>>>         } catch (Exception e) {
>>>>>             e.printStackTrace();
>>>>>         }
>>>>>         String seedFile = pro.getProperty("seed.file");
>>>>>         boolean seedFlag =
>>>>> Boolean.parseBoolean(pro.getProperty("seed.flag"));
>>>>>         // get points from file or random
>>>>>         if(seedFlag || !(new File(seedFile+"-1").exists())) {
>>>>>             Seeding.randomSeeding();
>>>>>         }
>>>>>         // map csv file
>>>>>         return env.readCsvFile(seedFile+"-1")
>>>>>             .lineDelimiter("\n")
>>>>>             .fieldDelimiter('\u0009')
>>>>>             //.fieldDelimiter("\t")
>>>>>             .includeFields(true, true, true, true)
>>>>>             .types(Integer.class, Double.class, Double.class,
>>>>> Long.class)
>>>>>             .map(new TupleCentroidConverter());
>>>>>     }
>>>>>
>>>>>     private static final class TupleCentroidConverter implements
>>>>> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{
>>>>>
>>>>>         private static final long serialVersionUID =
>>>>> -1046538744363026794L;
>>>>>
>>>>>         public GeoTimeDataCenter map(Tuple4<Integer, Double, Double,
>>>>> Long> t) throws Exception {
>>>>>             return new GeoTimeDataCenter(t.f0,new
>>>>> LatLongSeriable(t.f1, t.f2), t.f3);
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>>>>>
>>>>>> Concerning your first problem that you only see one resulting
>>>>>> centroid, your code looks good modulo the parts you haven't posted.
>>>>>>
>>>>>>  However, your problem could simply be caused by a bad selection of
>>>>>> initial centroids. If, for example, all centroids except for one don't 
>>>>>> get
>>>>>> any points assigned, then only one centroid will survive the iteration
>>>>>> step. How do you do it?
>>>>>>
>>>>>>  To check that all centroids are read you can print the contents of
>>>>>> the centroids DataSet. Furthermore, you can simply println the new
>>>>>> centroids after each iteration step. In local mode you can then observe 
>>>>>> the
>>>>>> computation.
>>>>>>
>>>>>>  Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>>  This problem should not depend on any user code. There are no
>>>>>>> user-code dependent actors in Flink.
>>>>>>>
>>>>>>>  Is there more stack trace that you can send us? It looks like it
>>>>>>> misses the core exception that is causing the issue is not part of the
>>>>>>> stack trace.
>>>>>>>
>>>>>>>  Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <
>>>>>>> paul.roewer1...@googlemail.com> wrote:
>>>>>>>
>>>>>>>>    hi flink community,
>>>>>>>>
>>>>>>>>  i have implement k-means for clustering temporal geo data. i use
>>>>>>>> the following github project and my own data structure:
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>>>>>>>>
>>>>>>>>  not i have the problem, that flink read the centroids from file
>>>>>>>> and work parallel futher. if i look at the results, i have the feeling,
>>>>>>>> that the prgramm load only one centroid point.
>>>>>>>>
>>>>>>>>  i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>>>>>>>> following exception:
>>>>>>>> ERROR actor.OneForOneStrategy: exception during creation
>>>>>>>> akka.actor.ActorInitializationException: exception during creation
>>>>>>>>     at
>>>>>>>> akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>>>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>>>>>>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>>>>>>>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>>>>>>>     at
>>>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>>>>>>     at
>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>>>>     at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>     at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>     at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>     at
>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>>>>     at
>>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>>>>>>     at
>>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>>>>>>     at
>>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>>     at
>>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>>>     at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>>>>>>>     at akka.actor.Props.newActor(Props.scala:337)
>>>>>>>>     at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:560)
>>>>>>>>     ... 9 more
>>>>>>>>
>>>>>>>>  how can i say flink, that it should be wait for loading dataset,
>>>>>>>> and what say this exception?
>>>>>>>>
>>>>>>>>  best regards,
>>>>>>>>  paul
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply via email to