Hi Felipe,

Just like the ReduceFunction, the WindowFunction is applied in the context
of a single key. So, it will be called for each key and always just see a
single record (the reduced record of the key).
You'd have to add a non-keyed window (allWindow) for your sorting
WindowFunction.
Note that this function cannot run in parallel.

Best, Fabian

2018-03-09 22:07 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com>:

> Hi all,
>
> I have a word count using flink stream and mey reduce transformations is
> applying a WindowFunction. I would like that this WindowFunction sort the
> output of the reduce. Is that possible? So I will sort by key the data set
> inside the window.
>
> Thanks for your ideas!
>
> Here is my code:
>
>         DataStream<Tuple2<String, Integer>> dataStream = env
>                 .socketTextStream("localhost", 9000)
>                 .map(new UpperCaserMap())
>                 .flatMap(new Splitter())
>                 .keyBy(new SumWordSelect()) // select the first value as a
> key using the KeySelector class
>                 .timeWindow(Time.seconds(5)) // use this if Apache Flink
> server is up
>                 .reduce(new SumWordsReduce(), new FIlterWindowFunction())
>                 ;
>
>     public static class ReduceWindowFunction implements WindowFunction<
>             Tuple2<String, Integer>, // input type
>             Tuple2<String, Integer>, // output type
>             String, // key type
>             TimeWindow> {
>
>         @Override
>         public void apply(String key,
>                           TimeWindow window,
>                           Iterable<Tuple2<String, Integer>> inputs,
>                           Collector<Tuple2<String, Integer>> out) {
>             Integer sum = 0;
>             for (Tuple2<String, Integer> input : inputs) {
>                 sum = sum + input.f1;
>             }
>             out.collect(new Tuple2<>(key, sum));
>         }
>     }
>
>     public static class FIlterWindowFunction implements WindowFunction<
>             Tuple2<String, Integer>, // input type
>             Tuple2<String, Integer>, // output type
>             String, // key type
>             TimeWindow> {
>
>         @Override
>         public void apply(String key,
>                           TimeWindow window,
>                           Iterable<Tuple2<String, Integer>> inputs,
>                           Collector<Tuple2<String, Integer>> out) {
>             // Integer value = 0;
>             for (Tuple2<String, Integer> input : inputs) {
>                 // if (input.f1 >= 3 && input.f1 > value) value = input.f1;
>                 out.collect(new Tuple2<>(key, input.f1));
>             }
>         }
>     }
>
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>

Reply via email to