[jira] Commented: (PIG-845) PERFORMANCE: Merge Join

2009-08-15 Thread Hudson (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12743685#action_12743685
 ] 

Hudson commented on PIG-845:


Integrated in Pig-trunk #523 (See 
[http://hudson.zones.apache.org/hudson/job/Pig-trunk/523/])
: PERFORMANCE: Merge Join (ashutoshc via pradeepkth) - deleting renamed 
file - MRStreamHandler.java
: PERFORMANCE: Merge Join (ashutoshc via pradeepkth)


 PERFORMANCE: Merge Join
 ---

 Key: PIG-845
 URL: https://issues.apache.org/jira/browse/PIG-845
 Project: Pig
  Issue Type: Improvement
Reporter: Olga Natkovich
Assignee: Ashutosh Chauhan
 Attachments: merge-join.patch


 Thsi join would work if the data for both tables is sorted on the join key.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.



[jira] Commented: (PIG-845) PERFORMANCE: Merge Join

2009-08-13 Thread Hadoop QA (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12743045#action_12743045
 ] 

Hadoop QA commented on PIG-845:
---

-1 overall.  Here are the results of testing the latest attachment 
  http://issues.apache.org/jira/secure/attachment/12416501/merge-join.patch
  against trunk revision 803377.

+1 @author.  The patch does not contain any @author tags.

+1 tests included.  The patch appears to include 13 new or modified tests.

+1 javadoc.  The javadoc tool did not generate any warning messages.

+1 javac.  The applied patch does not increase the total number of javac 
compiler warnings.

+1 findbugs.  The patch does not introduce any new Findbugs warnings.

-1 release audit.  The applied patch generated 162 release audit warnings 
(more than the trunk's current 161 warnings).

+1 core tests.  The patch passed core unit tests.

+1 contrib tests.  The patch passed contrib unit tests.

Test results: 
http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/testReport/
Release audit warnings: 
http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/artifact/trunk/patchprocess/releaseAuditDiffWarnings.txt
Findbugs warnings: 
http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Console output: 
http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/console

This message is automatically generated.

 PERFORMANCE: Merge Join
 ---

 Key: PIG-845
 URL: https://issues.apache.org/jira/browse/PIG-845
 Project: Pig
  Issue Type: Improvement
Reporter: Olga Natkovich
Assignee: Ashutosh Chauhan
 Attachments: merge-join.patch, merge-join.patch


 Thsi join would work if the data for both tables is sorted on the join key.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.



[jira] Commented: (PIG-845) PERFORMANCE: Merge Join

2009-08-12 Thread Dmitriy V. Ryaboy (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12742562#action_12742562
 ] 

Dmitriy V. Ryaboy commented on PIG-845:
---

Alan, Ashutosh -- maybe I am misunderstanding where null keys come from in the 
Indexer. I assumed this was due to the processing that happens in the plan the 
indexer deserializes and attaches to its POLocalRearrange.

In regards to errors, I was referring to this:
{code}
catch(PlanException e){
int errCode = 2034;
String msg = Error compiling operator  + 
joinOp.getClass().getCanonicalName();
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
{code}

The only central place for error codes seems to be the Wiki.  A class with a 
bunch of static+final error codes would be a better place.


Ashutosh, I completely disagree with you on changing all tests to run in MR 
mode.  The tests are already impossible to run on a laptop (people, myself 
included, actually submit patches to jira just to see if tests pass).  Running 
in MR mode will incur significant overhead per test. Only things that actually 
rely on the MR bits should be tested in MR (and use mock objects if possible.. 
there's been some advancement on that front in Hadoop 20, I haven't looked at 
it yet).

Would love to see a more efficient indexing MR job (which will reduce load on 
the JT, keep schedules less busy, and incur less overhead in task startups by 
requiring fewer tasks), but perhaps not before 0.4 is out the door with 
existing functionality.  Just to be clear, I don't think more than 1 record per 
block is necessary, but more than one block per task would probably be a good 
thing.

Any thoughts on how to choose which of two relations to index? We get locality 
on the non-indexed relation, but not on the indexed one, which probably throws 
a kink in the normal way of thinking about this.



 PERFORMANCE: Merge Join
 ---

 Key: PIG-845
 URL: https://issues.apache.org/jira/browse/PIG-845
 Project: Pig
  Issue Type: Improvement
Reporter: Olga Natkovich
Assignee: Ashutosh Chauhan
 Attachments: merge-join.patch


 Thsi join would work if the data for both tables is sorted on the join key.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.



[jira] Commented: (PIG-845) PERFORMANCE: Merge Join

2009-08-11 Thread Ashutosh Chauhan (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12741733#action_12741733
 ] 

Ashutosh Chauhan commented on PIG-845:
--

Hi Dmitriy,

Thanks for review. Please find my comments inline.

1.
EndOfAllInput flags - could you add comments here about what the point of this 
flag is? You explain what EndOfAllInputSetter does (which is actually rather 
self-explanatory) but not what the meaning of the flag is and how it's used. 
There is a bit of an explanation in PigMapBase, but it really belongs here.
 EndofAllInput flag is basically a flag to indicate that on close() call of 
 map/reduce task, run the pipeline once more. Till now it was used only by 
 POStream, but now POMergeJoin also make use of it.

2.
Could you explain the relationship between EndOfAllInput and (deleted) POStream?
 POStream is still there, I guess you are referring to MRStreamHandler which 
 is deleted. Its renaming of class. Now that POMergeJoin also makes use of 
 it, its better to give it a generic name like EndOfAllInput instead of 
 MRStreamHandler.

3.
Comments in MRCompiler alternate between referring to the left MROp as 
LeftMROper and curMROper. Choose one.
 Ya, will update the comments.

4.
I am curious about the decision to throw compiler exceptions if MergeJoin 
requirements re number of inputs, etc, aren't satisfied. It seems like a better 
user experience would be to log a warning and fall back to a regular join.
 Ya, a good suggestion. It would be straight forward to do it while parsing 
 (e.g. when there are more then two inputs). Though its not straight forward 
 to do at logical to physical plan and physical to MRJobs translation time. 

5.
Style notes for visitMergeJoin:

It's a 200-line method. Any way you can break it up into smaller components? As 
is, it's hard to follow.
 I can break it up, but that will bloat the MRCompiler class size. Better 
 idea is to have MRCompilerHelper or some such class where all the low level 
 helper function lives, so that MRCompiler itself is small and thus easier to 
 read. 

The if statements should be broken up into multiple lines to agree with the 
style guides.

Variable naming: you've got topPrj, prj, pkg, lr, ce, nig.. one at a time they 
are fine, but together in a 200-line method they are undreadable. Please 
consider more descriptive names.
 Will use more descriptive names in next patch.

6.
Kind of a global comment, since it applies to more than just MergeJoin:

It seems to me like we need a Builder for operators to clean up some of the 
new, set, set, set stuff.

Having the setters return this and a Plan's add() method return the plan, would 
let us replace this:

POProject topPrj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
topPrj.setColumn(1);
topPrj.setResultType(DataType.TUPLE);
topPrj.setOverloaded(true);
rightMROpr.reducePlan.add(topPrj);
rightMROpr.reducePlan.connect(pkg, topPrj);

with this:

POProject topPrj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)))
.setColumn(1).setResultType(DataType.TUPLE)
.setOverloaded(true);

rightMROpr.reducePlan.add(topPrj).connect(pkg, topPrj)

I agree. At many places there are too many parameters to set. Setters should 
be smart and should return the object instead of being void and then this 
idea of chaining will help to cut down the number of lines. 

7.
Is the change to ListListByte keyTypes in POFRJoin related to MergeJoin or 
just rolled in?
POFRJoin can do without this change, but to avoid code duplication, I update 
the POFRJoin to use ListListByte keyTypes.

8. MergeJoin

break getNext() into components.
 I dont want to do that because it already has lots of class members which 
 are getting updated at various places. Making those variables live in 
 multiple functions will make logic even more harder to follow. Also, I am 
 not sure if java compiler can always inline the private methods.

I don't see you supporting Left outer joins. Plans for that? At least document 
the planned approach.
 Ya, outer joins are currently not supported. Its documented in 
 specification. Will include comment in code also.

Error codes being declared deep inside classes, and documented on the wiki, is 
a poor practice, imo. They should be pulled out into PigErrors (as lightweight 
final objects that have an error code, a name, and a description..) I thought 
Santhosh made progress on this already, no?
 Not sure if I understand you completely. I am using ExecException, 
 FrontEndException etc. Arent these are lightweight final objects you are 
 referring to ?

Could you explain the problem with splits and streams? Why can't this work for 
them?
 Streaming after the join will be supported. There was a bug which I fixed 
 and will be a part of next patch. Streaming before Join will not be 
 supported because in endOfAllInput case, streaming may potentially produce 
 multiple tuples 

[jira] Commented: (PIG-845) PERFORMANCE: Merge Join

2009-08-10 Thread Dmitriy V. Ryaboy (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12741589#action_12741589
 ] 

Dmitriy V. Ryaboy commented on PIG-845:
---

Some Comments below.
It's a big patch, so a lot of comments...

1. 
EndOfAllInput flags -- could you add comments here about what the point of this 
flag is? You explain what EndOfAllInputSetter does (which is actually rather 
self-explanatory) but not what the meaning of the flag is and how it's used. 
There is a bit of an explanation in PigMapBase, but it really belongs here.

2.
Could you explain the relationship between EndOfAllInput and (deleted) POStream?

3.
Comments in MRCompiler alternate between referring to the left MROp as 
LeftMROper and curMROper. Choose one.

4.
I am curious about the decision to throw compiler exceptions if MergeJoin 
requirements re number of inputs, etc, aren't satisfied. It seems like a better 
user experience would be to log a warning and fall back to a regular join.

5.
Style notes for visitMergeJoin: 

It's a 200-line method. Any way you can break it up into smaller components? As 
is, it's hard to follow.

The if statements should be broken up into multiple lines to agree with the 
style guides.

Variable naming: you've got topPrj, prj, pkg, lr, ce, nig.. one at a time they 
are fine, but together in a 200-line method they are undreadable. Please 
consider more descriptive names.

6.
Kind of a global comment, since it applies to more than just MergeJoin:

It seems to me like we need a Builder for operators to clean up some of the 
new, set, set, set stuff.

Having the setters return this and a Plan's add() method return the plan, would 
let us replace this:

POProject topPrj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
topPrj.setColumn(1);
topPrj.setResultType(DataType.TUPLE);
topPrj.setOverloaded(true);
rightMROpr.reducePlan.add(topPrj);
rightMROpr.reducePlan.connect(pkg, topPrj);

with this:

POProject topPrj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)))
.setColumn(1).setResultType(DataType.TUPLE)
.setOverloaded(true);

rightMROpr.reducePlan.add(topPrj).connect(pkg, topPrj)


7.
Is the change to ListListByte keyTypes in POFRJoin related to MergeJoin or 
just rolled in?

8. MergeJoin

break getNext() into components.

I don't see you supporting Left outer joins. Plans for that? At least document 
the planned approach.

Error codes being declared deep inside classes, and documented on the wiki, is 
a poor practice, imo. They should be pulled out into PigErrors (as lightweight 
final objects that have an error code, a name, and a description..) I thought 
Santhosh made progress on this already, no?

Could you explain the problem with splits and streams? Why can't this work for 
them?


9. Sampler/Indexer:
9a
Looks like you create the same number of map tasks for this as you do for a 
join; all a sampling map task does is read one record and emit a single tuple.  
That seems wasteful; there is a lot of overhead in setting up these tiny jobs 
which might get stuck behind other jobs running on the cluster, etc. If the 
underlying file has syncpoints, a smaller number of MR tasks can be created. If 
we know the ratio of sample tasks to full tasks, we can figure out how many 
records we should emit per job ( ceil(full_tasks/sample_tasks) ).  We can 
approximately achieve this through seeking trough (end-offset)/num_to_emit and 
doing a sync() after that seek. It's approximate, but close enough for an index.

9b
Consider renaming to something like SortedFileIndexer, since it's coneivable 
that this component can be reused in a context other than a Merge Join.

10.
Would it make sense to expose this to the users via a 'CREATE INDEX' (or 
similar) command?
That way the index could be persisted, and the user could tell you to use an 
existing index instead of rescanning the data.

11.
I am not sure about the approach of pushing sampling above filters. Have you 
guys benchmarked this? Seems like you'd wind up reading the whole file in the 
sample job if the filter is selective enough (and high filter selectivity would 
also make materialize-sample go much faster).

Testing: 
12a
You should test for refusal to do 3-way join and other error condition (or a 
warning and successful failover to regular join -- my preference)

12b
You should do a proper unit test for the MergeJoinIndexer (or whatever we are 
calling it).



 PERFORMANCE: Merge Join
 ---

 Key: PIG-845
 URL: https://issues.apache.org/jira/browse/PIG-845
 Project: Pig
  Issue Type: Improvement
Reporter: Olga Natkovich
Assignee: Ashutosh Chauhan
 Attachments: merge-join-1.patch, merge-join-for-review.patch


 Thsi join would work if the data for both tables is sorted on the join key.

-- 
This message is 

[jira] Commented: (PIG-845) PERFORMANCE: Merge Join

2009-08-10 Thread Pradeep Kamath (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12741621#action_12741621
 ] 

Pradeep Kamath commented on PIG-845:


Review comments:
1) In LogicalPlanTester.java, why is the following change required?
{noformat}
@@ -198,7 +198,7 @@
 private LogicalPlan buildPlan(String query, ClassLoader cldr) {
 
 LogicalPlanBuilder.classloader = 
LogicalPlanTester.class.getClassLoader() ;
-PigContext pigContext = new PigContext(ExecType.LOCAL, new 
Properties());
+PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new 
Properties());
 try {
 pigContext.connect();
 } catch (ExecException e1) {
{noformat}

Typically when PigContext is constructed in Map-reduce mode, the properties 
should correspond to the cluster configuration. So the above initialization 
seems odd because the Properties object is an empty object in the constructor 
call above.

2) In PigMapBase.java:

public static final String END_OF_INP_IN_MAP = pig.stream.in.map;

can change to

public static final String END_OF_INP_IN_MAP = pig.blocking.operator.in.map; 
and this should be put as a public static member of JobControlCompiler.

In JobControlCompiler.java,

jobConf.set(pig.stream.in.map, true);  should change to use the above 
public static String.


3) Remove the following comment in QueryParser.jjt (line 302):
{code}
* Join parser. Currently can only handle skewed joins.
{code}

4) In QueryParser.jjt the joinPlans passed to LOJoin constructor is not a 
LinkedMultiMap
but in LogToPhyTranslationVistior the join plans are put in a LinkedMultiMap. 
If order is
important, shouldn't QueryParser.jjt also change?

5) Some comments in LogToPhyTranslationVisitor about the different lists and 
maps would help :)

6) In validateMergeJoin() - the code only considers direct successors and 
predecessors of LOJoin. It should check the entire plan and ensure that 
predecessors of LOJoin all the way to the LOLoad are only LOForEach and 
LOFilter. Strictly we should not allow LOForeach since it could change sort 
order or position of join keys and hence invalidate the index - but we need it
so that the Foreach introduced by the TypeCastInserter when there is a schema 
for either of the inputs remains. You should note in the documentation that 
only Order and join key position preserving Foreachs and Filters are allowed as 
predecessors to merge join and check the same in validateMergeJoin() - it is 
better to use a whitelist of allowed operators than a blacklist
of disallowed once (since then the blacklist would need to be updated anytime a 
new operator comes along. The exception source here is not really a bug but a 
user input error since merge join really doesnot support other ops.

Again for the successor, all successors from mergejoin down to map leaf should 
be checked to ensure stream is absent (really there should be no restriction on 
stream being present after the join - if there is an issue currently with this, 
it is fine to not allow stream but eventually it would be good to not have any 
restriction on what follows the merge join). You can just use a visitor to 
check presence of stream in the plan - this should be done after complete 
LogToPhyTranslation is done - in visit() so that the whole plan can be looked 
at.

7) Is MRStreamHandler.java now replaced by 
/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 ?

8) Some of MRCompilerExceptions do not follow the Error handling spec - 
errcode, errMsg, Src

9) Should assert() statements in MRCompiler be replaced with Exceptions since 
assertions are disabled by default in Java.

10) In MRCompiler.java I wonder if you should change
{code}
rightMapPlan.disconnect(rightLoader, loadSucc);
rightMapPlan.remove(loadSucc);
{code}
to
{code}
rightMapPlan.trimBelow(rightLoader);
{code}
We really want to remove all operators in rightMapPlan other than the loader.

11) We should note in documentation that merge join only works for data sorted 
in ascending order. (the MRCompiler code assumes this - we should have sort 
check if possible - see performance comment below)

12) It would be good to add a couple of unit tests with a few operators after 
merge join to ensure merge join operators well with successors in the plan.

13) In POMergeJoin.java, comments about foreach should be cleaned up since 
foreach is no longer used. For example:
{code}
//variable which denotes whether we are returning tuples from the foreach 
operator
{code}

The following code can be factored out into a function since its repeated twice:
{code}
   case POStatus.STATUS_EOP:  // Current file has ended. 
Need to open next file by reading next index entry.
String prevFile = 

[jira] Commented: (PIG-845) PERFORMANCE: Merge Join

2009-07-30 Thread Pradeep Kamath (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12737377#action_12737377
 ] 

Pradeep Kamath commented on PIG-845:


Some initial comments on POMergeJoin.java:

If status is not OK - it shuld just be returned (no run time
exception like above) - similar comments for other places in POMergeJoin where 
there is
a switch case on processInput() - once this change is made, the code in 
if(processingFE) also
will need to change accordingly
{code}
if(firstTime){
// Do initial setup.
curLeftInp = processInput();
switch(curLeftInp.returnStatus){
case POStatus.STATUS_OK:
break;

case POStatus.STATUS_EOP: // Return because we want to fetch next 
left tuple.
return curLeftInp;
default:
throw new RuntimeException(Unexpected Status);
}
{code}

All non RuntimeExceptions should follow error handling specification by using 
the correct Exception created with error code, cause, message, src constructor.
http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#head-9f71d78d362c3307711f98ec9db3ee12b55e92f6
 should be updated with new error code #

detachInput() is not required in POMergeJoin - processInput takes care of it

IN the code below, we could cache away the key to be used while processFE is 
true as processFEKey and then we need not
extract key for each join
// Cant use the prevLeftKey, because we are reading ahead.
// Need key of current bag. Since we have just 
finished doing the join
// bag must contain atleast one element.
res.returnStatus = POStatus.STATUS_OK;
res.result = leftTuples.get(0);
curLeftKey = extractKeysFromTuple(res, 0);


 PERFORMANCE: Merge Join
 ---

 Key: PIG-845
 URL: https://issues.apache.org/jira/browse/PIG-845
 Project: Pig
  Issue Type: Improvement
Reporter: Olga Natkovich
 Attachments: merge-join-for-review.patch


 Thsi join would work if the data for both tables is sorted on the join key.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.