thanks for your message, maybe you can give me a exsample for the GroupReduceFunction?
2015-05-22 23:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > There are two ways to do that: > > 1) You use a GroupReduceFunction, which gives you an iterator over all > points similar to Hadoop's ReduceFunction. > 2) You use the ReduceFunction to compute the sum and the count at the same > time (e.g., in two fields of a Tuple2) and use a MapFunction to do the > final division. > > I'd go with the first choice. It's easier. > > Best, Fabian > > 2015-05-22 23:09 GMT+02:00 Paul Röwer <paul.roewer1...@googlemail.com>: > >> good evening, >> >> sorry, my english is not the best. >> >> by comupte the new centroid, i will sum all points of the cluster and >> form the new center. >> in my other implementation firstly i sum all point and at the end i >> divides by number of points. >> to example: (1+2+3+4)/4=2,5 >> >> at flink i reduce always two point to one, >> for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> >> (2,25+4)=3,125 >> >> how can i rewrite my function so, that it work like my other >> implementation? >> >> best regards, >> paul >> >> >> >> Am 22.05.2015 um 16:52 schrieb Stephan Ewen: >> >> Sorry, I don't understand the question. >> >> Can you describe a bit better what you mean with "how i can sum all >> points and share thoug the counter" ? >> >> Thanks! >> >> On Fri, May 22, 2015 at 2:06 PM, Pa Rö <paul.roewer1...@googlemail.com> >> wrote: >> >>> i have fix a bug at the input reading, but the results are still >>> different. >>> >>> i think i have local the problem, in the other implementation i sum all >>> geo points/time points and share thougt the counter. >>> but in flink i sum two points and share thougt two, and sum the next... >>> >>> the method is the following: >>> >>> // sums and counts point coordinates >>> private static final class CentroidAccumulator implements >>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { >>> >>> private static final long serialVersionUID = >>> -4868797820391121771L; >>> >>> public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, >>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { >>> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, >>> addAndDiv(val1.f0,val1.f1,val2.f1)); >>> } >>> } >>> >>> private static GeoTimeDataTupel addAndDiv(int >>> clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){ >>> long time = (input1.getTime()+input2.getTime())/2; >>> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); >>> list.add(input1.getGeo()); >>> list.add(input2.getGeo()); >>> LatLongSeriable geo = Geometry.getGeoCenterOf(list); >>> >>> return new GeoTimeDataTupel(geo,time,"POINT"); >>> } >>> >>> how i can sum all points and share thoug the counter? >>> >>> >>> 2015-05-22 9:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: >>> >>>> hi, >>>> if i print the centroids all are show in the output. i have implement >>>> k means with map reduce und spark. by same input, i get the same output. >>>> but in flink i get a one cluster output with this input set. (i use csv >>>> files from the GDELT projekt) >>>> >>>> here my class: >>>> >>>> public class FlinkMain { >>>> >>>> >>>> public static void main(String[] args) { >>>> //load properties >>>> Properties pro = new Properties(); >>>> try { >>>> pro.load(new >>>> FileInputStream("./resources/config.properties")); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> int maxIteration = >>>> 1;//Integer.parseInt(pro.getProperty("maxiterations")); >>>> String outputPath = pro.getProperty("flink.output"); >>>> // set up execution environment >>>> ExecutionEnvironment env = >>>> ExecutionEnvironment.getExecutionEnvironment(); >>>> // get input points >>>> DataSet<GeoTimeDataTupel> points = getPointDataSet(env); >>>> DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env); >>>> // set number of bulk iterations for KMeans algorithm >>>> IterativeDataSet<GeoTimeDataCenter> loop = >>>> centroids.iterate(maxIteration); >>>> DataSet<GeoTimeDataCenter> newCentroids = points >>>> // compute closest centroid for each point >>>> .map(new SelectNearestCenter()).withBroadcastSet(loop, >>>> "centroids") >>>> // count and sum point coordinates for each centroid >>>> .groupBy(0).reduce(new CentroidAccumulator()) >>>> // compute new centroids from point counts and coordinate >>>> sums >>>> .map(new CentroidAverager()); >>>> // feed new centroids back into next iteration >>>> DataSet<GeoTimeDataCenter> finalCentroids = >>>> loop.closeWith(newCentroids); >>>> DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = >>>> points >>>> // assign points to final clusters >>>> .map(new >>>> SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); >>>> // emit result >>>> clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); >>>> finalCentroids.writeAsText(outputPath+"/centers");//print(); >>>> // execute program >>>> try { >>>> env.execute("KMeans Flink"); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> >>>> private static final class SelectNearestCenter extends >>>> RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> { >>>> >>>> private static final long serialVersionUID = >>>> -2729445046389350264L; >>>> private Collection<GeoTimeDataCenter> centroids; >>>> >>>> @Override >>>> public void open(Configuration parameters) throws Exception { >>>> this.centroids = >>>> getRuntimeContext().getBroadcastVariable("centroids"); >>>> } >>>> >>>> @Override >>>> public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel >>>> point) throws Exception { >>>> double minDistance = Double.MAX_VALUE; >>>> int closestCentroidId= -1; >>>> >>>> // check all cluster centers >>>> for(GeoTimeDataCenter centroid : centroids) { >>>> // compute distance >>>> double distance = Distance.ComputeDist(point, centroid); >>>> // update nearest cluster if necessary >>>> if(distance < minDistance) { >>>> minDistance = distance; >>>> closestCentroidId = centroid.getId(); >>>> } >>>> } >>>> // emit a new record with the center id and the data point >>>> return new Tuple2<Integer, >>>> GeoTimeDataTupel>(closestCentroidId, point); >>>> } >>>> } >>>> >>>> // sums and counts point coordinates >>>> private static final class CentroidAccumulator implements >>>> ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> { >>>> >>>> private static final long serialVersionUID = >>>> -4868797820391121771L; >>>> >>>> public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, >>>> GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) { >>>> return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, >>>> addAndDiv(val1.f1,val2.f1)); >>>> } >>>> } >>>> >>>> private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, >>>> GeoTimeDataTupel input2){ >>>> long time = (input1.getTime()+input2.getTime())/2; >>>> List<LatLongSeriable> list = new ArrayList<LatLongSeriable>(); >>>> list.add(input1.getGeo()); >>>> list.add(input2.getGeo()); >>>> LatLongSeriable geo = Geometry.getGeoCenterOf(list); >>>> >>>> return new GeoTimeDataTupel(geo,time,"POINT"); >>>> } >>>> >>>> // computes new centroid from coordinate sum and count of points >>>> private static final class CentroidAverager implements >>>> MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> { >>>> >>>> private static final long serialVersionUID = >>>> -2687234478847261803L; >>>> >>>> public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> >>>> value) { >>>> return new GeoTimeDataCenter(value.f0, >>>> value.f1.getGeo(),value.f1.getTime()); >>>> } >>>> } >>>> >>>> private static DataSet<GeoTimeDataTupel> >>>> getPointDataSet(ExecutionEnvironment env) { >>>> // load properties >>>> Properties pro = new Properties(); >>>> try { >>>> pro.load(new >>>> FileInputStream("./resources/config.properties")); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> String inputFile = pro.getProperty("input"); >>>> // map csv file >>>> return env.readCsvFile(inputFile) >>>> .ignoreInvalidLines() >>>> .fieldDelimiter('\u0009') >>>> //.fieldDelimiter("\t") >>>> //.lineDelimiter("\n") >>>> .includeFields(true, true, false, false, false, false, >>>> false, false, false, false, false >>>> , false, false, false, false, false, false, false, >>>> false, false, false >>>> , false, false, false, false, false, false, false, >>>> false, false, false >>>> , false, false, false, false, false, false, false, >>>> false, true, true >>>> , false, false, false, false, false, false, false, >>>> false, false, false >>>> , false, false, false, false, false, false, false, >>>> false) >>>> //.includeFields(true,true,true,true) >>>> .types(String.class, Long.class, Double.class, Double.class) >>>> .map(new TuplePointConverter()); >>>> } >>>> >>>> private static final class TuplePointConverter implements >>>> MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{ >>>> >>>> private static final long serialVersionUID = >>>> 3485560278562719538L; >>>> >>>> public GeoTimeDataTupel map(Tuple4<String, Long, Double, >>>> Double> t) throws Exception { >>>> return new GeoTimeDataTupel(new LatLongSeriable(t.f2, >>>> t.f3), t.f1, t.f0); >>>> } >>>> } >>>> >>>> private static DataSet<GeoTimeDataCenter> >>>> getCentroidDataSet(ExecutionEnvironment env) { >>>> // load properties >>>> Properties pro = new Properties(); >>>> try { >>>> pro.load(new >>>> FileInputStream("./resources/config.properties")); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> String seedFile = pro.getProperty("seed.file"); >>>> boolean seedFlag = >>>> Boolean.parseBoolean(pro.getProperty("seed.flag")); >>>> // get points from file or random >>>> if(seedFlag || !(new File(seedFile+"-1").exists())) { >>>> Seeding.randomSeeding(); >>>> } >>>> // map csv file >>>> return env.readCsvFile(seedFile+"-1") >>>> .lineDelimiter("\n") >>>> .fieldDelimiter('\u0009') >>>> //.fieldDelimiter("\t") >>>> .includeFields(true, true, true, true) >>>> .types(Integer.class, Double.class, Double.class, >>>> Long.class) >>>> .map(new TupleCentroidConverter()); >>>> } >>>> >>>> private static final class TupleCentroidConverter implements >>>> MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{ >>>> >>>> private static final long serialVersionUID = >>>> -1046538744363026794L; >>>> >>>> public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, >>>> Long> t) throws Exception { >>>> return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, >>>> t.f2), t.f3); >>>> } >>>> } >>>> } >>>> >>>> 2015-05-21 14:22 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: >>>> >>>>> Concerning your first problem that you only see one resulting >>>>> centroid, your code looks good modulo the parts you haven't posted. >>>>> >>>>> However, your problem could simply be caused by a bad selection of >>>>> initial centroids. If, for example, all centroids except for one don't get >>>>> any points assigned, then only one centroid will survive the iteration >>>>> step. How do you do it? >>>>> >>>>> To check that all centroids are read you can print the contents of >>>>> the centroids DataSet. Furthermore, you can simply println the new >>>>> centroids after each iteration step. In local mode you can then observe >>>>> the >>>>> computation. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <se...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> This problem should not depend on any user code. There are no >>>>>> user-code dependent actors in Flink. >>>>>> >>>>>> Is there more stack trace that you can send us? It looks like it >>>>>> misses the core exception that is causing the issue is not part of the >>>>>> stack trace. >>>>>> >>>>>> Greetings, >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> On Thu, May 21, 2015 at 11:11 AM, Pa Rö < >>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>> >>>>>>> hi flink community, >>>>>>> >>>>>>> i have implement k-means for clustering temporal geo data. i use >>>>>>> the following github project and my own data structure: >>>>>>> >>>>>>> https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java >>>>>>> >>>>>>> not i have the problem, that flink read the centroids from file and >>>>>>> work parallel futher. if i look at the results, i have the feeling, that >>>>>>> the prgramm load only one centroid point. >>>>>>> >>>>>>> i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the >>>>>>> following exception: >>>>>>> ERROR actor.OneForOneStrategy: exception during creation >>>>>>> akka.actor.ActorInitializationException: exception during creation >>>>>>> at >>>>>>> akka.actor.ActorInitializationException$.apply(Actor.scala:218) >>>>>>> at akka.actor.ActorCell.create(ActorCell.scala:578) >>>>>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425) >>>>>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447) >>>>>>> at >>>>>>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262) >>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:218) >>>>>>> at >>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>>> Method) >>>>>>> at >>>>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) >>>>>>> at >>>>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>> at >>>>>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526) >>>>>>> at akka.util.Reflect$.instantiate(Reflect.scala:65) >>>>>>> at akka.actor.Props.newActor(Props.scala:337) >>>>>>> at akka.actor.ActorCell.newActor(ActorCell.scala:534) >>>>>>> at akka.actor.ActorCell.create(ActorCell.scala:560) >>>>>>> ... 9 more >>>>>>> >>>>>>> how can i say flink, that it should be wait for loading dataset, >>>>>>> and what say this exception? >>>>>>> >>>>>>> best regards, >>>>>>> paul >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> >