Caizhi Weng created FLINK-13236: ----------------------------------- Summary: Fix bug and improve performance in TopNBuffer Key: FLINK-13236 URL: https://issues.apache.org/jira/browse/FLINK-13236 Project: Flink Issue Type: Improvement Reporter: Caizhi Weng Assignee: Caizhi Weng
In {{TopNBuffer}} we have the following method: {code:java} /** * Puts a record list into the buffer under the sortKey. * Note: if buffer already contains sortKey, putAll will overwrite the previous value * * @param sortKey sort key with which the specified values are to be associated * @param values record lists to be associated with the specified key */ void putAll(BaseRow sortKey, Collection<BaseRow> values) { treeMap.put(sortKey, values); currentTopNum += values.size(); } {code} When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should be first subtracted by the old {{value.size()}} in {{treeMap}} then added (can't be directly added). As currently only {{AppendOnlyTopNFunction}} uses this method in its init procedure, this bug is not triggered. {code:java} /** * Gets record which rank is given value. * * @param rank rank value to search * @return the record which rank is given value */ BaseRow getElement(int rank) { int curRank = 0; Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = treeMap.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next(); Collection<BaseRow> list = entry.getValue(); Iterator<BaseRow> listIter = list.iterator(); while (listIter.hasNext()) { BaseRow elem = listIter.next(); curRank += 1; if (curRank == rank) { return elem; } } } return null; } {code} We can remove the inner loop by adding {{curRank}} by {{list.size()}} each time. -- This message was sent by Atlassian JIRA (v7.6.14#76016)