[
https://issues.apache.org/jira/browse/PIG-171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12598851#action_12598851
]
Amir Youssefi commented on PIG-171:
-----------------------------------
OK, it's time to move this topic forward. Here are more details and response to
comments:
My original idea was to implement this as a combiner (algebraic function). Just
like SUM or MAX etc. , we develop a combiner which gets top K rows (by some
comparator) on each map node and passes those rows to reducer. Reducer gets m*K
rows (m being number of mappers) and only keeps Top K rows (using the same
comparator) and gives it as output. Idea is to use following optimization
opportunities:
A) Limit number of rows sent from mapper to reducer to K instead of all map
output rows.
B) In mapper, since we know that intention is using only Top K rows then we can
do very good optimizations e.g. in case of sort (ORDER BY) we just keep top K
rows (say having 100 sorted rows in memory and comparing new rows with those
100 as we read/stream new rows). Let's call this TOP/LIMIT aware implementation.
C) Implementation of the part which combines results in reducer is similar to
item 2 above.
Since we last discussed the issue, users/dev members have gotten more
interested in this and feature found it's way into Pig Road Map. So we can
upgrade idea from UDF to core built-in functionality.
Now to answer Alan's comments:
5) Name doesn't really matter MS SQL Server uses TOP, My SQL uses LIMIT, Oracle
users rownum, row_number(partition by order by), rank() and dense_rank().
Academics use Top N, Top K in somewhat similar context. LIMIT sounds good to me
as well. Especially if we want to have a range LIMIT n,m
4) That's correct. So one workaround was grouping by all columns and using
"group" in foreach to have combiner kick-in. It's easy. Pipeline rework is
nearby so we can skip workaround and use new things.
3) Now that we have attention of people/developers, we can upgrade idea from
UDF to core built-in functionality.
2) It's about what we define as top rows. We can say top 100 rows according to
a sequence number (equivalent of Oracle row_number() , rownum). But in many
scenarios, user is interested in top values not just number of rows (similar to
oracle: rank()) or dense_rank()). Let's skip details of handling equal values
for now and implement simples ones ten revisit when we have forged main issues.
1) I had to do that for UDF. Now that we are upgrading to Pig built-in we can
use something like ORDER BY $0 using MY_COMPARATOR in one line then use LIMIT
100 on second line and expect Pig to optimize implementation of ORDER taking
into account that we care about 100 rows only. See item B above which means
implementation of sort in a way that we just keep 100 rows (LIMIT/TOP aware
implementation as opposed to sorting the whole data-set then keep 100 rows of
it). Another way of design might be having both generic LIMIT that can be used
on every variable and a LIMIT used on the same line of ORDER i.e. ORDER by $0
using MY_COMPARATOR LIMIT 100. This may trigger LIMIT AWARE ORDER easier. I
leave this to Alan.
We can consider above as our road-map but implement things step-by-step. First
LIMIT K on one line pushing the row dropping to mappers. Then optimization of
implementing LIMIT aware sort and such. A UDF can do both A and B.
> Top K
> -----
>
> Key: PIG-171
> URL: https://issues.apache.org/jira/browse/PIG-171
> Project: Pig
> Issue Type: New Feature
> Reporter: Amir Youssefi
> Assignee: Amir Youssefi
>
> Frequently, users are interested on Top results (especially Top K rows) .
> This can be implemented efficiently in Pig /Map Reduce settings to deliver
> rapid results and low Network Bandwidth/Memory usage.
>
> Key point is to prune all data on the map side and keep only small set of
> rows with Top criteria . We can do it in Algebraic function (combiner) with
> multiple value output. Only a small data-set gets out of mapper node.
> The same idea is applicable to solve variants of this problem:
> - An Algebraic Function for 'Top K Rows'
> - An Algebraic Function for 'Top K' values ('Top Rank K' and 'Top Dense
> Rank K')
> - TOP K ORDER BY.
> Another words implementation is similar to combiners for aggregate functions
> but instead of one value we get multiple ones.
> I will add a sample implementation for Top K Rows and possibly TOP K ORDER BY
> to clarify details.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.