Caizhi Weng created FLINK-13236:
-----------------------------------

             Summary: Fix bug and improve performance in TopNBuffer
                 Key: FLINK-13236
                 URL: https://issues.apache.org/jira/browse/FLINK-13236
             Project: Flink
          Issue Type: Improvement
            Reporter: Caizhi Weng
            Assignee: Caizhi Weng


In {{TopNBuffer}} we have the following method:

{code:java}
/**
 * Puts a record list into the buffer under the sortKey.
 * Note: if buffer already contains sortKey, putAll will overwrite the previous 
value
 *
 * @param sortKey sort key with which the specified values are to be associated
 * @param values record lists to be associated with the specified key
 */
void putAll(BaseRow sortKey, Collection<BaseRow> values) {
        treeMap.put(sortKey, values);
        currentTopNum += values.size();
}
{code}

When {{sortKey}} already exists in {{treeMap}}, the {{currentTopNum}} should be 
first subtracted by the old {{value.size()}} in {{treeMap}} then added (can't 
be directly added). As currently only {{AppendOnlyTopNFunction}} uses this 
method in its init procedure, this bug is not triggered.


{code:java}
/**
 * Gets record which rank is given value.
 *
 * @param rank rank value to search
 * @return the record which rank is given value
 */
BaseRow getElement(int rank) {
        int curRank = 0;
        Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = 
treeMap.entrySet().iterator();
        while (iter.hasNext()) {
                Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next();
                Collection<BaseRow> list = entry.getValue();

                Iterator<BaseRow> listIter = list.iterator();
                while (listIter.hasNext()) {
                        BaseRow elem = listIter.next();
                        curRank += 1;
                        if (curRank == rank) {
                                return elem;
                        }
                }
        }
        return null;
}
{code}

We can remove the inner loop by adding {{curRank}} by {{list.size()}} each time.




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to