[ 
https://issues.apache.org/jira/browse/PIG-171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12598851#action_12598851
 ] 

Amir Youssefi commented on PIG-171:
-----------------------------------

OK, it's time to move this topic forward. Here are more details and response to 
comments:

My original idea was to implement this as a combiner (algebraic function). Just 
like SUM or MAX etc. , we develop a combiner which gets top K rows (by some 
comparator) on each map node and passes those rows to reducer. Reducer gets m*K 
rows (m being number of mappers) and only keeps Top K rows (using the same 
comparator) and gives it as output. Idea is to use following optimization 
opportunities:

A) Limit number of rows sent from mapper to reducer to K instead of all map 
output rows.
B) In mapper, since we know that intention is using only Top K rows then we can 
do very good optimizations e.g. in case of sort  (ORDER BY) we just keep top K 
rows (say having 100 sorted rows in memory and comparing new rows with those 
100 as we read/stream new rows). Let's call this TOP/LIMIT aware implementation.
C) Implementation of the part which combines results in reducer is similar to 
item 2 above.

Since we last discussed the issue, users/dev members have gotten more 
interested in this and feature found it's way into Pig Road Map. So we can 
upgrade idea from UDF to core built-in functionality. 

Now to answer Alan's comments: 

5) Name doesn't really matter MS SQL Server uses TOP, My SQL uses LIMIT, Oracle 
users rownum, row_number(partition by order by), rank() and dense_rank(). 
Academics use Top N, Top K in somewhat similar context. LIMIT sounds good to me 
as well. Especially if we want to have a range LIMIT n,m

4) That's correct. So one workaround was grouping by all columns and using 
"group" in foreach to have combiner kick-in. It's easy. Pipeline rework is 
nearby so we can skip workaround and use new things.

3) Now that we have attention of people/developers, we can upgrade idea from 
UDF to core built-in functionality. 

2) It's about what we define as top rows. We can say top 100 rows according to 
a sequence number (equivalent of Oracle row_number() , rownum). But in many 
scenarios, user is interested in top values not just number of rows (similar to 
oracle: rank()) or dense_rank()). Let's skip details of handling equal values 
for now and implement simples ones ten revisit when we have forged main issues.

1) I had to do that for UDF. Now that we are upgrading to Pig built-in we can 
use something like ORDER BY $0 using MY_COMPARATOR in one line then use LIMIT 
100 on second line and expect Pig to optimize implementation of ORDER taking 
into account that we care about 100 rows only. See item B above which means 
implementation of sort in a way that we just keep 100 rows (LIMIT/TOP aware 
implementation as opposed to sorting the whole data-set then keep 100 rows of 
it). Another way of design might be having both generic LIMIT that can be used 
on every variable and a LIMIT used on the same line of ORDER i.e. ORDER by $0 
using MY_COMPARATOR LIMIT 100. This may trigger LIMIT AWARE ORDER easier. I 
leave this to Alan.

We can consider above as our road-map but implement things step-by-step. First 
LIMIT K on one line pushing the row dropping to mappers. Then optimization of 
implementing LIMIT aware sort and such. A UDF can do both A and B. 


> Top K
> -----
>
>                 Key: PIG-171
>                 URL: https://issues.apache.org/jira/browse/PIG-171
>             Project: Pig
>          Issue Type: New Feature
>            Reporter: Amir Youssefi
>            Assignee: Amir Youssefi
>
> Frequently, users are interested on Top results (especially Top K rows) . 
> This can be implemented efficiently in Pig /Map Reduce settings to deliver 
> rapid results and low Network Bandwidth/Memory usage.
>  
>  Key point is to prune all data on the map side and keep only small set of 
> rows with Top criteria . We can do it in Algebraic function (combiner) with 
> multiple value output. Only a small data-set gets out of mapper node.
> The same idea is applicable to solve variants of this problem:
>   - An Algebraic Function for 'Top K Rows'
>   - An Algebraic Function for 'Top K' values ('Top Rank K' and 'Top Dense 
> Rank K')
>   - TOP K ORDER BY.
> Another words implementation is similar to combiners for aggregate functions 
> but instead of one value we get multiple ones. 
> I will add a sample implementation for Top K Rows and possibly TOP K ORDER BY 
> to clarify details.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to