thanks for your message,

maybe you can give me a exsample for the GroupReduceFunction?

2015-05-22 23:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> There are two ways to do that:
>
> 1) You use a GroupReduceFunction, which gives you an iterator over all
> points similar to Hadoop's ReduceFunction.
> 2) You use the ReduceFunction to compute the sum and the count at the same
> time (e.g., in two fields of a Tuple2) and use a MapFunction to do the
> final division.
>
> I'd go with the first choice. It's easier.
>
> Best, Fabian
>
> 2015-05-22 23:09 GMT+02:00 Paul Röwer <paul.roewer1...@googlemail.com>:
>
>>  good evening,
>>
>> sorry, my english is not the best.
>>
>> by comupte the new centroid, i will sum all points of the cluster and
>> form the new center.
>> in my other implementation firstly i sum all point and at the end i
>> divides by number of points.
>> to example: (1+2+3+4)/4=2,5
>>
>> at flink i reduce always two point to one,
>> for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 -->
>> (2,25+4)=3,125
>>
>> how can i rewrite my function so, that it work like my other
>> implementation?
>>
>> best regards,
>> paul
>>
>>
>>
>> Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
>>
>> Sorry, I don't understand the question.
>>
>>  Can you describe a bit better what you mean with "how i can sum all
>> points and share thoug the counter" ?
>>
>>  Thanks!
>>
>> On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1...@googlemail.com>
>> wrote:
>>
>>>   i have fix a bug at the input reading, but the results are still
>>> different.
>>>
>>> i think i have local the problem, in the other implementation i sum all
>>> geo points/time points and share thougt the counter.
>>>  but in flink i sum two points and share thougt two, and sum the next...
>>>
>>>  the method is the following:
>>>
>>> // sums and counts point coordinates
>>>     private static final class CentroidAccumulator implements
>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>>
>>>         private static final long serialVersionUID =
>>> -4868797820391121771L;
>>>
>>>         public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
>>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>>>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>> addAndDiv(val1.f0,val1.f1,val2.f1));
>>>         }
>>>     }
>>>
>>>     private static GeoTimeDataTupel addAndDiv(int
>>> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
>>>         long time = (input1.getTime()+input2.getTime())/2;
>>>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>>         list.add(input1.getGeo());
>>>         list.add(input2.getGeo());
>>>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>>
>>>         return new GeoTimeDataTupel(geo,time,"POINT");
>>>     }
>>>
>>>  how i can sum all points and share thoug the counter?
>>>
>>>
>>> 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>:
>>>
>>>>  hi,
>>>>  if i print the centroids all are show in the output. i have implement
>>>> k means with map reduce und spark. by same input, i get the same output.
>>>> but in flink i get a one cluster output with this input set. (i use csv
>>>> files from the GDELT projekt)
>>>>
>>>>  here my class:
>>>>
>>>> public class FlinkMain {
>>>>
>>>>
>>>>     public static void main(String[] args) {
>>>>         //load properties
>>>>         Properties pro = new Properties();
>>>>         try {
>>>>             pro.load(new
>>>> FileInputStream("./resources/config.properties"));
>>>>         } catch (Exception e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>         int maxIteration =
>>>> 1;//Integer.parseInt(pro.getProperty("maxiterations"));
>>>>         String outputPath = pro.getProperty("flink.output");
>>>>         // set up execution environment
>>>>         ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>         // get input points
>>>>         DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
>>>>         DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
>>>>         // set number of bulk iterations for KMeans algorithm
>>>>         IterativeDataSet<GeoTimeDataCenter> loop =
>>>> centroids.iterate(maxIteration);
>>>>         DataSet<GeoTimeDataCenter> newCentroids = points
>>>>             // compute closest centroid for each point
>>>>             .map(new SelectNearestCenter()).withBroadcastSet(loop,
>>>> "centroids")
>>>>             // count and sum point coordinates for each centroid
>>>>             .groupBy(0).reduce(new CentroidAccumulator())
>>>>             // compute new centroids from point counts and coordinate
>>>> sums
>>>>             .map(new CentroidAverager());
>>>>         // feed new centroids back into next iteration
>>>>         DataSet<GeoTimeDataCenter> finalCentroids =
>>>> loop.closeWith(newCentroids);
>>>>         DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints =
>>>> points
>>>>             // assign points to final clusters
>>>>             .map(new
>>>> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
>>>>         // emit result
>>>>         clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
>>>>         finalCentroids.writeAsText(outputPath+"/centers");//print();
>>>>         // execute program
>>>>         try {
>>>>             env.execute("KMeans Flink");
>>>>         } catch (Exception e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>>      private static final class SelectNearestCenter extends
>>>> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
>>>>
>>>>         private static final long serialVersionUID =
>>>> -2729445046389350264L;
>>>>         private Collection<GeoTimeDataCenter> centroids;
>>>>
>>>>         @Override
>>>>         public void open(Configuration parameters) throws Exception {
>>>>             this.centroids =
>>>> getRuntimeContext().getBroadcastVariable("centroids");
>>>>         }
>>>>
>>>>         @Override
>>>>         public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel
>>>> point) throws Exception {
>>>>             double minDistance = Double.MAX_VALUE;
>>>>             int closestCentroidId= -1;
>>>>
>>>>             // check all cluster centers
>>>>             for(GeoTimeDataCenter centroid : centroids) {
>>>>                 // compute distance
>>>>                 double distance = Distance.ComputeDist(point, centroid);
>>>>                 // update nearest cluster if necessary
>>>>                 if(distance < minDistance) {
>>>>                     minDistance = distance;
>>>>                     closestCentroidId = centroid.getId();
>>>>                 }
>>>>             }
>>>>             // emit a new record with the center id and the data point
>>>>             return new Tuple2<Integer,
>>>> GeoTimeDataTupel>(closestCentroidId, point);
>>>>         }
>>>>     }
>>>>
>>>>     // sums and counts point coordinates
>>>>     private static final class CentroidAccumulator implements
>>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {
>>>>
>>>>         private static final long serialVersionUID =
>>>> -4868797820391121771L;
>>>>
>>>>         public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer,
>>>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
>>>>             return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0,
>>>> addAndDiv(val1.f1,val2.f1));
>>>>         }
>>>>     }
>>>>
>>>>     private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1,
>>>> GeoTimeDataTupel input2){
>>>>         long time = (input1.getTime()+input2.getTime())/2;
>>>>         List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
>>>>         list.add(input1.getGeo());
>>>>         list.add(input2.getGeo());
>>>>         LatLongSeriable geo = Geometry.getGeoCenterOf(list);
>>>>
>>>>         return new GeoTimeDataTupel(geo,time,"POINT");
>>>>     }
>>>>
>>>>     // computes new centroid from coordinate sum and count of points
>>>>     private static final class CentroidAverager implements
>>>> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {
>>>>
>>>>         private static final long serialVersionUID =
>>>> -2687234478847261803L;
>>>>
>>>>         public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel>
>>>> value) {
>>>>             return new GeoTimeDataCenter(value.f0,
>>>> value.f1.getGeo(),value.f1.getTime());
>>>>         }
>>>>     }
>>>>
>>>>     private static DataSet<GeoTimeDataTupel>
>>>> getPointDataSet(ExecutionEnvironment env) {
>>>>         // load properties
>>>>         Properties pro = new Properties();
>>>>         try {
>>>>             pro.load(new
>>>> FileInputStream("./resources/config.properties"));
>>>>         } catch (Exception e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>         String inputFile = pro.getProperty("input");
>>>>         // map csv file
>>>>         return env.readCsvFile(inputFile)
>>>>             .ignoreInvalidLines()
>>>>             .fieldDelimiter('\u0009')
>>>>             //.fieldDelimiter("\t")
>>>>             //.lineDelimiter("\n")
>>>>             .includeFields(true, true, false, false, false, false,
>>>> false, false, false, false, false
>>>>                     , false, false, false, false, false, false, false,
>>>> false, false, false
>>>>                     , false, false, false, false, false, false, false,
>>>> false, false, false
>>>>                     , false, false, false, false, false, false, false,
>>>> false, true, true
>>>>                     , false, false, false, false, false, false, false,
>>>> false, false, false
>>>>                     , false, false, false, false, false, false, false,
>>>> false)
>>>>             //.includeFields(true,true,true,true)
>>>>             .types(String.class, Long.class, Double.class, Double.class)
>>>>             .map(new TuplePointConverter());
>>>>     }
>>>>
>>>>     private static final class TuplePointConverter implements
>>>> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
>>>>
>>>>         private static final long serialVersionUID =
>>>> 3485560278562719538L;
>>>>
>>>>         public GeoTimeDataTupel map(Tuple4<String, Long, Double,
>>>> Double> t) throws Exception {
>>>>             return new GeoTimeDataTupel(new LatLongSeriable(t.f2,
>>>> t.f3), t.f1, t.f0);
>>>>         }
>>>>     }
>>>>
>>>>     private static DataSet<GeoTimeDataCenter>
>>>> getCentroidDataSet(ExecutionEnvironment env) {
>>>>         // load properties
>>>>         Properties pro = new Properties();
>>>>         try {
>>>>             pro.load(new
>>>> FileInputStream("./resources/config.properties"));
>>>>         } catch (Exception e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>         String seedFile = pro.getProperty("seed.file");
>>>>         boolean seedFlag =
>>>> Boolean.parseBoolean(pro.getProperty("seed.flag"));
>>>>         // get points from file or random
>>>>         if(seedFlag || !(new File(seedFile+"-1").exists())) {
>>>>             Seeding.randomSeeding();
>>>>         }
>>>>         // map csv file
>>>>         return env.readCsvFile(seedFile+"-1")
>>>>             .lineDelimiter("\n")
>>>>             .fieldDelimiter('\u0009')
>>>>             //.fieldDelimiter("\t")
>>>>             .includeFields(true, true, true, true)
>>>>             .types(Integer.class, Double.class, Double.class,
>>>> Long.class)
>>>>             .map(new TupleCentroidConverter());
>>>>     }
>>>>
>>>>     private static final class TupleCentroidConverter implements
>>>> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{
>>>>
>>>>         private static final long serialVersionUID =
>>>> -1046538744363026794L;
>>>>
>>>>         public GeoTimeDataCenter map(Tuple4<Integer, Double, Double,
>>>> Long> t) throws Exception {
>>>>             return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1,
>>>> t.f2), t.f3);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>:
>>>>
>>>>> Concerning your first problem that you only see one resulting
>>>>> centroid, your code looks good modulo the parts you haven't posted.
>>>>>
>>>>>  However, your problem could simply be caused by a bad selection of
>>>>> initial centroids. If, for example, all centroids except for one don't get
>>>>> any points assigned, then only one centroid will survive the iteration
>>>>> step. How do you do it?
>>>>>
>>>>>  To check that all centroids are read you can print the contents of
>>>>> the centroids DataSet. Furthermore, you can simply println the new
>>>>> centroids after each iteration step. In local mode you can then observe 
>>>>> the
>>>>> computation.
>>>>>
>>>>>  Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>>  This problem should not depend on any user code. There are no
>>>>>> user-code dependent actors in Flink.
>>>>>>
>>>>>>  Is there more stack trace that you can send us? It looks like it
>>>>>> misses the core exception that is causing the issue is not part of the
>>>>>> stack trace.
>>>>>>
>>>>>>  Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö <
>>>>>> paul.roewer1...@googlemail.com> wrote:
>>>>>>
>>>>>>>    hi flink community,
>>>>>>>
>>>>>>>  i have implement k-means for clustering temporal geo data. i use
>>>>>>> the following github project and my own data structure:
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
>>>>>>>
>>>>>>>  not i have the problem, that flink read the centroids from file and
>>>>>>> work parallel futher. if i look at the results, i have the feeling, that
>>>>>>> the prgramm load only one centroid point.
>>>>>>>
>>>>>>>  i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the
>>>>>>> following exception:
>>>>>>> ERROR actor.OneForOneStrategy: exception during creation
>>>>>>> akka.actor.ActorInitializationException: exception during creation
>>>>>>>     at
>>>>>>> akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>>>>>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>>>>>>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>>>>>>     at
>>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>>>>>     at
>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>>>     at
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>     at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>     at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>     at
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>>>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>>>> Method)
>>>>>>>     at
>>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>>>>>     at
>>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>     at
>>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>>>>>     at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>>>>>>     at akka.actor.Props.newActor(Props.scala:337)
>>>>>>>     at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>>>>>     at akka.actor.ActorCell.create(ActorCell.scala:560)
>>>>>>>     ... 9 more
>>>>>>>
>>>>>>>  how can i say flink, that it should be wait for loading dataset,
>>>>>>> and what say this exception?
>>>>>>>
>>>>>>>  best regards,
>>>>>>>  paul
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to