Hi Felipe, Just like the ReduceFunction, the WindowFunction is applied in the context of a single key. So, it will be called for each key and always just see a single record (the reduced record of the key). You'd have to add a non-keyed window (allWindow) for your sorting WindowFunction. Note that this function cannot run in parallel.
Best, Fabian 2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com>: > Hi all, > > I have a word count using flink stream and mey reduce transformations is > applying a WindowFunction. I would like that this WindowFunction sort the > output of the reduce. Is that possible? So I will sort by key the data set > inside the window. > > Thanks for your ideas! > > Here is my code: > > DataStream<Tuple2<String, Integer>> dataStream = env > .socketTextStream("localhost", 9000) > .map(new UpperCaserMap()) > .flatMap(new Splitter()) > .keyBy(new SumWordSelect()) // select the first value as a > key using the KeySelector class > .timeWindow(Time.seconds(5)) // use this if Apache Flink > server is up > .reduce(new SumWordsReduce(), new FIlterWindowFunction()) > ; > > public static class ReduceWindowFunction implements WindowFunction< > Tuple2<String, Integer>, // input type > Tuple2<String, Integer>, // output type > String, // key type > TimeWindow> { > > @Override > public void apply(String key, > TimeWindow window, > Iterable<Tuple2<String, Integer>> inputs, > Collector<Tuple2<String, Integer>> out) { > Integer sum = 0; > for (Tuple2<String, Integer> input : inputs) { > sum = sum + input.f1; > } > out.collect(new Tuple2<>(key, sum)); > } > } > > public static class FIlterWindowFunction implements WindowFunction< > Tuple2<String, Integer>, // input type > Tuple2<String, Integer>, // output type > String, // key type > TimeWindow> { > > @Override > public void apply(String key, > TimeWindow window, > Iterable<Tuple2<String, Integer>> inputs, > Collector<Tuple2<String, Integer>> out) { > // Integer value = 0; > for (Tuple2<String, Integer> input : inputs) { > // if (input.f1 >= 3 && input.f1 > value) value = input.f1; > out.collect(new Tuple2<>(key, input.f1)); > } > } > } > > > > -- > > *---- Felipe Oliveira Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* >