Hi Jeffery, Flink uses a (potentially external) merge-sort to group data. Combining is done using an in-memory sort. Because Flink uses pipelined data transfer, the execution of operators in a program can overlap. For example in WordCount, the sort of a groupBy will immediately start as soon as the first record was read, tokenized and shuffled to the sorter, i.e., the data is not sent as a batch from the tokenizer to the reducer but streamed.
Compared to hash-based aggregations, sort-based aggregations are often less efficient (esp. for low numbers of distinct keys). Flink puts a focus on execution robustness, which is why it implements internal algorithms (sort, hash-tables) on off-heap memory in a way that they do not fail for larger data sets or higher number of distinct keys (no parameter tuning to get a program working). Since this is more effort than just implementing or using a hash-table that resides on the JVM heap, we haven't added a hash-based combiner yet. When you say Flink WordCount is slow, which numbers do you compare to? Best, Fabian 2015-10-30 7:00 GMT+01:00 Jinfeng Li <[email protected]>: > Hi, I find wordcount on Flink is slow and 75% of the time is spent on > groupBy operator. The dataset is 90G, with only 1000 distinct words. Could > you tell me how the groupBy is implemented? > > Best Regards, > Jeffrey > >
