Pradeep Kamath commented on PIG-845:

Review comments:
1) In LogicalPlanTester.java, why is the following change required?
@@ -198,7 +198,7 @@
     private LogicalPlan buildPlan(String query, ClassLoader cldr) {
         LogicalPlanBuilder.classloader = 
LogicalPlanTester.class.getClassLoader() ;
-        PigContext pigContext = new PigContext(ExecType.LOCAL, new 
+        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new 
         try {
         } catch (ExecException e1) {

Typically when PigContext is constructed in Map-reduce mode, the properties 
should correspond to the cluster configuration. So the above initialization 
seems odd because the Properties object is an empty object in the constructor 
call above.

2) In PigMapBase.java:

public static final String END_OF_INP_IN_MAP = "pig.stream.in.map";

can change to

public static final String END_OF_INP_IN_MAP = "pig.blocking.operator.in.map"; 
and this should be put as a public static member of JobControlCompiler.

In JobControlCompiler.java,

jobConf.set("pig.stream.in.map", "true");  should change to use the above 
public static String.

3) Remove the following comment in QueryParser.jjt (line 302):
    * Join parser. Currently can only handle skewed joins.        

4) In QueryParser.jjt the joinPlans passed to LOJoin constructor is not a 
but in LogToPhyTranslationVistior the join plans are put in a LinkedMultiMap. 
If order is
important, shouldn't QueryParser.jjt also change?

5) Some comments in LogToPhyTranslationVisitor about the different lists and 
maps would help :)

6) In validateMergeJoin() - the code only considers direct successors and 
predecessors of LOJoin. It should check the entire plan and ensure that 
predecessors of LOJoin all the way to the LOLoad are only LOForEach and 
LOFilter. Strictly we should not allow LOForeach since it could change sort 
order or position of join keys and hence invalidate the index - but we need it
so that the Foreach introduced by the TypeCastInserter when there is a schema 
for either of the inputs remains. You should note in the documentation that 
only Order and join key position preserving Foreachs and Filters are allowed as 
predecessors to merge join and check the same in validateMergeJoin() - it is 
better to use a whitelist of allowed operators than a blacklist
of disallowed once (since then the blacklist would need to be updated anytime a 
new operator comes along. The exception source here is not really a bug but a 
user input error since merge join really doesnot support other ops.

Again for the successor, all successors from mergejoin down to map leaf should 
be checked to ensure stream is absent (really there should be no restriction on 
stream being present after the join - if there is an issue currently with this, 
it is fine to not allow stream but eventually it would be good to not have any 
restriction on what follows the merge join). You can just use a visitor to 
check presence of stream in the plan - this should be done after complete 
LogToPhyTranslation is done - in visit() so that the whole plan can be looked 

7) Is MRStreamHandler.java now replaced by 

8) Some of MRCompilerExceptions do not follow the Error handling spec - 
errcode, errMsg, Src

9) Should assert() statements in MRCompiler be replaced with Exceptions since 
assertions are disabled by default in Java.

10) In MRCompiler.java I wonder if you should change
                    rightMapPlan.disconnect(rightLoader, loadSucc);
We really want to remove all operators in rightMapPlan other than the loader.

11) We should note in documentation that merge join only works for data sorted 
in ascending order. (the MRCompiler code assumes this - we should have sort 
check if possible - see performance comment below)

12) It would be good to add a couple of unit tests with a few operators after 
merge join to ensure merge join operators well with successors in the plan.

13) In POMergeJoin.java, comments about foreach should be cleaned up since 
foreach is no longer used. For example:
//variable which denotes whether we are returning tuples from the foreach 

The following code can be factored out into a function since its repeated twice:
               case POStatus.STATUS_EOP:          // Current file has ended. 
Need to open next file by reading next index entry.
                    String prevFile = rightLoader.getLFile().getFileName();
                    while(true){                        // But next file may be 
same as previous one, because index may contain multiple entries for same file.
                        Tuple idxEntry = index.poll();
                        if(null == idxEntry)          // Index is finished too. 
Right stream is finished. No more tuples.
                            return res;
                                res = rightLoader.getNext(dummyTuple);
                                return this.getNextRightInp();


A couple of things to try and check impact on performance:
1) Introduce checks for sortedness of inputs to merge join
2) Increase sample size from 1 per map to say 10 per map

> -----------------------
>                 Key: PIG-845
>                 URL: https://issues.apache.org/jira/browse/PIG-845
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Olga Natkovich
>            Assignee: Ashutosh Chauhan
>         Attachments: merge-join-1.patch, merge-join-for-review.patch
> Thsi join would work if the data for both tables is sorted on the join key.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

Reply via email to