Hi! When I try to apply a groupReduce function to a data set I get an error.
The data set is created like this: DataSet<Tuple3<String, String, String>> actorsTemp = env.readCsvFile("/home/lea/Documents/impro3_ws15/actors.tsv") .fieldDelimiter("\t") .includeFields("1110") .types(String.class, String.class, String.class); DataSet<Tuple3<String, String, String>> actresses = env.readCsvFile("/home/lea/Documents/impro3_ws15/actresses.tsv") .fieldDelimiter("\t") .includeFields("1110") .types(String.class, String.class, String.class); DataSet<Tuple3<Float, Float, String>> ratings = env.readCsvFile("/home/lea/Documents/impro3_ws15/ratings.tsv") .fieldDelimiter("\t") .includeFields("0111") .types(Float.class, Float.class, String.class) .filter(new NumberVotesFilter()); //merge actors and actresses DataSet<Tuple3<String, String, String>> actors = actorsTemp.union(actresses); //create weighted rating DataSet<Tuple2<String, Float>> weightedRatings = ratings.map(new WeightedRatingCalculator()); THIS IS WHAT I'M TRYING IN THE MAIN METHOD: actors.map(new JoinNames()) .join(weightedRatings) .where(1).equalTo(0) .projectFirst(0).projectSecond(1) .groupBy(0) .reduceGroup(new MeanRatingCalculator()) .first(10).print(); And here is the GroupReduce function I wrote: public static class MeanRatingCalculator implements GroupReduceFunction<Tuple2<String, Float>, Tuple3<String, Float, Integer>> { public void reduce(Iterable<Tuple2<String, Float>> ratedActors, Collector<Tuple3<String, Float, Integer>> out) throws Exception { String name = null; Float ratings = 0F; int numberOfMovies = 0; for (Tuple2<String, Float> a : ratedActors) { //store the name name = a.f0; //update the sum of the ratings and number of movies ratings += a.f1; numberOfMovies++; } // emit name, average rating and number of films out.collect(new Tuple3<String, Float, Integer>(name, ratings/(float)numberOfMovies, numberOfMovies)); } } I get the following error message when I try to compile the code: java: method reduceGroup in class org.apache.flink.api.java.operators.UnsortedGrouping<T> cannot be applied to given types; required: org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R> found: de.tub.dima.TopActors.MeanRatingCalculator reason: no instance(s) of type variable(s) R exist so that argument type de.tub.dima.TopActors.MeanRatingCalculator conforms to formal parameter type org.apache.flink.api.common.functions.GroupReduceFunction<org.apache.flink.api.java.tuple.Tuple,R> I can't figure out what the problem might be and would be very grateful for any help!! I hope I have given all the necessary information. I'm using Ubuntu 14.04 and IntelliJ Idea as IDE. Thank you very much, Lea