My algorithm is roughly like this taking top-K words problem as an example
(the purpose of computing local “word count” is to deal with data

DataStream of words ->
timeWindow of 1h ->
converted to DataSet of words ->
random partitioning by rebalance ->
local “word count” using mapPartition ->
global “word count” using reduceGroup ->
rebalance ->
local top-K using mapPartition ->
global top-K using reduceGroup

Here is some (probably buggy) code to demonstrate the basic idea on DataSet:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.util.Collector;

import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

public class WordCount {

  public static void main(String[] args) throws Exception {

    // set up the execution environment
    final ExecutionEnvironment env =

    // get input data
    DataSet<String> text = env.fromElements(

    DataSet<Tuple2<String, Integer>> counts = text
        // split up the lines in pairs (2-tuples) containing: (word,1)
        .flatMap(new LineSplitter())
        // local word count
        .mapPartition(new MapPartitionFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>>() {
          public void mapPartition(Iterable<Tuple2<String, Integer>> words,
                       Collector<Tuple2<String, Integer>> out) throws
Exception {
            SortedMap<String, Integer> m = new TreeMap<String, Integer>();
            for (Tuple2<String, Integer> w : words) {
              Integer current = m.get(w.f0);
              Integer updated = current == null ? w.f1 : current + w.f1;
              m.put(w.f0, updated);

            for (Map.Entry<String, Integer> e : m.entrySet()) {
              out.collect(Tuple2.of(e.getKey(), e.getValue()));
        // global word count
        .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>,
Tuple2<String, Integer>>() {
          public void reduce(Iterable<Tuple2<String, Integer>> wordcounts,
                    Collector<Tuple2<String, Integer>> out) throws Exception {
            SortedMap<String, Integer> m = new TreeMap<String, Integer>();
            for (Tuple2<String, Integer> wc : wordcounts) {
              Integer current = m.get(wc.f0);
              Integer updated = current == null ? wc.f1 : current + wc.f1;
              m.put(wc.f0, updated);

            for (Map.Entry<String, Integer> e : m.entrySet()) {
              out.collect(Tuple2.of(e.getKey(), e.getValue()));

    DataSet<Tuple2<String, Integer>> topK = counts
        // local top-K
        .mapPartition(new MapPartitionFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>>() {
          public void mapPartition(Iterable<Tuple2<String, Integer>> wordcounts,
                       Collector<Tuple2<String, Integer>> out) throws
Exception {
            SortedMap<Integer, String> topKSoFar = new
TreeMap<Integer, String>();
            for (Tuple2<String, Integer> wc : wordcounts) {
              String w = wc.f0;
              Integer c = wc.f1;
              topKSoFar.put(c, w);
              if (topKSoFar.size() > 3) {

            for (Map.Entry<Integer, String> cw : topKSoFar.entrySet()) {
              out.collect(Tuple2.of(cw.getValue(), cw.getKey()));
        // global top-K
        .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>,
Tuple2<String, Integer>>() {
          public void reduce(Iterable<Tuple2<String, Integer>> topList,
                    Collector<Tuple2<String, Integer>> out) throws Exception {
            SortedMap<Integer, String> topKSoFar = new
TreeMap<Integer, String>();
            for (Tuple2<String, Integer> wc : topList) {
              String w = wc.f0;
              Integer c = wc.f1;
              topKSoFar.put(c, w);
              if (topKSoFar.size() > 3) {

            for (Map.Entry<Integer, String> cw : topKSoFar.entrySet()) {
              out.collect(Tuple2.of(cw.getValue(), cw.getKey()));

    // execute and print result



  public static final class LineSplitter implements
FlatMapFunction<String, Tuple2<String, Integer>> {
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
      String[] tokens = value.split("");

      for (String token : tokens) {
        if (token.length() > 0) {
          out.collect(new Tuple2<String, Integer>(token, 1));

Reply via email to