Ashutosh Chauhan commented on PIG-845:

Hi Pradeep,

Thanks for the review. Please find my comments inline.

1) In LogicalPlanTester.java, why is the following change required?
Typically when PigContext is constructed in Map-reduce mode, the properties 
should correspond to the cluster configuration. So the above initialization 
seems odd because the Properties object is an empty object in the constructor 
call above.

>> This is required because in local mode merge join gets rewritten as a 
>> regular join. So, if we had exec type as local, the plan which I get in 
>> MRCompiler corresponds to regular join plan against which we cant test merge 
>> join plan. Properties object has no bearing here, because LogicalPlanTester 
>> is used only for testing logical plans. Further I think all our tests should 
>> have exec type as MapReduce because we want to test the correctness in 
>> MapReduce mode.

2) In PigMapBase.java:
public static final String END_OF_INP_IN_MAP = "pig.stream.in.map";
can change to
public static final String END_OF_INP_IN_MAP = "pig.blocking.operator.in.map"; 
and this should be put as a public static member of JobControlCompiler.
In JobControlCompiler.java,
jobConf.set("pig.stream.in.map", "true"); should change to use the above public 
static String.
>> Will update this in new patch.

3) Remove the following comment in QueryParser.jjt (line 302):
* Join parser. Currently can only handle skewed joins.
>> Will be removed in next patch.

4) In QueryParser.jjt the joinPlans passed to LOJoin constructor is not a 
but in LogToPhyTranslationVistior the join plans are put in a LinkedMultiMap. 
If order is
important, shouldn't QueryParser.jjt also change?
>> Good catch. Order is indeed important. Will fix this in next patch.

5) Some comments in LogToPhyTranslationVisitor about the different lists and 
maps would help
>> those lists and maps were there earlier also, I didnt introduce anything 
>> new. I just moved them around :) But I agree that section needs to be 
>> documented better. Also took me a while to get my head around it. Will 
>> include comment about purpose of each in next patch.

6) In validateMergeJoin() - the code only considers direct successors and 
predecessors of LOJoin. It should check the entire plan and ensure that 
predecessors of LOJoin all the way to the LOLoad are only LOForEach and 
LOFilter. Strictly we should not allow LOForeach since it could change sort 
order or position of join keys and hence invalidate the index - but we need it
so that the Foreach introduced by the TypeCastInserter when there is a schema 
for either of the inputs remains. You should note in the documentation that 
only Order and join key position preserving Foreachs and Filters are allowed as 
predecessors to merge join and check the same in validateMergeJoin() - it is 
better to use a whitelist of allowed operators than a blacklist
of disallowed once (since then the blacklist would need to be updated anytime a 
new operator comes along. The exception source here is not really a bug but a 
user input error since merge join really doesnot support other ops.

Again for the successor, all successors from mergejoin down to map leaf should 
be checked to ensure stream is absent (really there should be no restriction on 
stream being present after the join - if there is an issue currently with this, 
it is fine to not allow stream but eventually it would be good to not have any 
restriction on what follows the merge join). You can just use a visitor to 
check presence of stream in the plan - this should be done after complete 
LogToPhyTranslation is done - in visit() so that the whole plan can be looked 

>> Agreed. I fixed the bug for Streaming. Now there is no restriction for what 
>> follows Merge Join. For predecessors, I included new function which walks 
>> all the way up to make sure operators preceding merge join are the only the 
>> ones among the whitelist of LOLoad or LOForEach or LOFilter.
7) Is MRStreamHandler.java now replaced by 
>> Yes.

8) Some of MRCompilerExceptions do not follow the Error handling spec - 
errcode, errMsg, Src
>> Will update them.

9) Should assert() statements in MRCompiler be replaced with Exceptions since 
assertions are disabled by default in Java.
>> Will update them.

10) In MRCompiler.java I wonder if you should change
rightMapPlan.disconnect(rightLoader, loadSucc);
We really want to remove all operators in rightMapPlan other than the loader.
>> Didn't know about this function. This indeed is the one which is needed here.

11) We should note in documentation that merge join only works for data sorted 
in ascending order. (the MRCompiler code assumes this - we should have sort 
check if possible - see performance comment below)
>> Will include in comments.

12) It would be good to add a couple of unit tests with a few operators after 
merge join to ensure merge join operators well with successors in the plan.
>> there was one already with load - load -join -join-union-filter. Will 
>> include one more which introduces MR boundary after merge -join.
load -load -join-group-filter.

13) In POMergeJoin.java, comments about foreach should be cleaned up since 
foreach is no longer used. 
>> Will update it.

The following code can be factored out into a function since its repeated twice:
>> If you see closely, its not exact repetition thus can't be factored out.

A couple of things to try and check impact on performance:
1) Introduce checks for sortedness of inputs to merge join
>> I introduced these checks and benchmarked and there was no noticeable 
>> difference in CPU times, so I am including them. Now POMergeJoin checks for 
>> data sortedness and fails if it finds data isn't sorted. 

2) Increase sample size from 1 per map to say 10 per map
>> This is a classic case of dense vs sparse index trade-offs. Dense index will 
>> be beneficial when there are lots of distinct keys but takes longer to 
>> build. On the other hand if there are lots of rows corresponding to same key 
>> this wouldn't buy us much, infact may hurt as we will spend more time in 
>> index construction time. Moreover, a better index essentially may help us to 
>> cut down on the read times of right side. In my experimentation I found read 
>> times are negligible compared to actually producing joined tuples and 
>> writing them out to DFS ( in order of tens of seconds for task lasting 
>> couple of hours). So this needs to be thought bit more carefully and 
>> benchmarked. For now I am sampling one tuple per map block.

> -----------------------
>                 Key: PIG-845
>                 URL: https://issues.apache.org/jira/browse/PIG-845
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Olga Natkovich
>            Assignee: Ashutosh Chauhan
>         Attachments: merge-join-1.patch, merge-join-for-review.patch
> Thsi join would work if the data for both tables is sorted on the join key.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

Reply via email to