[jira] Commented: (PIG-845) PERFORMANCE: Merge Join
[ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12743685#action_12743685 ] Hudson commented on PIG-845: Integrated in Pig-trunk #523 (See [http://hudson.zones.apache.org/hudson/job/Pig-trunk/523/]) : PERFORMANCE: Merge Join (ashutoshc via pradeepkth) - deleting renamed file - MRStreamHandler.java : PERFORMANCE: Merge Join (ashutoshc via pradeepkth) PERFORMANCE: Merge Join --- Key: PIG-845 URL: https://issues.apache.org/jira/browse/PIG-845 Project: Pig Issue Type: Improvement Reporter: Olga Natkovich Assignee: Ashutosh Chauhan Attachments: merge-join.patch Thsi join would work if the data for both tables is sorted on the join key. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (PIG-845) PERFORMANCE: Merge Join
[ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12743045#action_12743045 ] Hadoop QA commented on PIG-845: --- -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12416501/merge-join.patch against trunk revision 803377. +1 @author. The patch does not contain any @author tags. +1 tests included. The patch appears to include 13 new or modified tests. +1 javadoc. The javadoc tool did not generate any warning messages. +1 javac. The applied patch does not increase the total number of javac compiler warnings. +1 findbugs. The patch does not introduce any new Findbugs warnings. -1 release audit. The applied patch generated 162 release audit warnings (more than the trunk's current 161 warnings). +1 core tests. The patch passed core unit tests. +1 contrib tests. The patch passed contrib unit tests. Test results: http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/testReport/ Release audit warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/artifact/trunk/patchprocess/releaseAuditDiffWarnings.txt Findbugs warnings: http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-minerva.apache.org/162/console This message is automatically generated. PERFORMANCE: Merge Join --- Key: PIG-845 URL: https://issues.apache.org/jira/browse/PIG-845 Project: Pig Issue Type: Improvement Reporter: Olga Natkovich Assignee: Ashutosh Chauhan Attachments: merge-join.patch, merge-join.patch Thsi join would work if the data for both tables is sorted on the join key. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (PIG-845) PERFORMANCE: Merge Join
[ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12742562#action_12742562 ] Dmitriy V. Ryaboy commented on PIG-845: --- Alan, Ashutosh -- maybe I am misunderstanding where null keys come from in the Indexer. I assumed this was due to the processing that happens in the plan the indexer deserializes and attaches to its POLocalRearrange. In regards to errors, I was referring to this: {code} catch(PlanException e){ int errCode = 2034; String msg = Error compiling operator + joinOp.getClass().getCanonicalName(); throw new MRCompilerException(msg, errCode, PigException.BUG, e); {code} The only central place for error codes seems to be the Wiki. A class with a bunch of static+final error codes would be a better place. Ashutosh, I completely disagree with you on changing all tests to run in MR mode. The tests are already impossible to run on a laptop (people, myself included, actually submit patches to jira just to see if tests pass). Running in MR mode will incur significant overhead per test. Only things that actually rely on the MR bits should be tested in MR (and use mock objects if possible.. there's been some advancement on that front in Hadoop 20, I haven't looked at it yet). Would love to see a more efficient indexing MR job (which will reduce load on the JT, keep schedules less busy, and incur less overhead in task startups by requiring fewer tasks), but perhaps not before 0.4 is out the door with existing functionality. Just to be clear, I don't think more than 1 record per block is necessary, but more than one block per task would probably be a good thing. Any thoughts on how to choose which of two relations to index? We get locality on the non-indexed relation, but not on the indexed one, which probably throws a kink in the normal way of thinking about this. PERFORMANCE: Merge Join --- Key: PIG-845 URL: https://issues.apache.org/jira/browse/PIG-845 Project: Pig Issue Type: Improvement Reporter: Olga Natkovich Assignee: Ashutosh Chauhan Attachments: merge-join.patch Thsi join would work if the data for both tables is sorted on the join key. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (PIG-845) PERFORMANCE: Merge Join
[ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12741733#action_12741733 ] Ashutosh Chauhan commented on PIG-845: -- Hi Dmitriy, Thanks for review. Please find my comments inline. 1. EndOfAllInput flags - could you add comments here about what the point of this flag is? You explain what EndOfAllInputSetter does (which is actually rather self-explanatory) but not what the meaning of the flag is and how it's used. There is a bit of an explanation in PigMapBase, but it really belongs here. EndofAllInput flag is basically a flag to indicate that on close() call of map/reduce task, run the pipeline once more. Till now it was used only by POStream, but now POMergeJoin also make use of it. 2. Could you explain the relationship between EndOfAllInput and (deleted) POStream? POStream is still there, I guess you are referring to MRStreamHandler which is deleted. Its renaming of class. Now that POMergeJoin also makes use of it, its better to give it a generic name like EndOfAllInput instead of MRStreamHandler. 3. Comments in MRCompiler alternate between referring to the left MROp as LeftMROper and curMROper. Choose one. Ya, will update the comments. 4. I am curious about the decision to throw compiler exceptions if MergeJoin requirements re number of inputs, etc, aren't satisfied. It seems like a better user experience would be to log a warning and fall back to a regular join. Ya, a good suggestion. It would be straight forward to do it while parsing (e.g. when there are more then two inputs). Though its not straight forward to do at logical to physical plan and physical to MRJobs translation time. 5. Style notes for visitMergeJoin: It's a 200-line method. Any way you can break it up into smaller components? As is, it's hard to follow. I can break it up, but that will bloat the MRCompiler class size. Better idea is to have MRCompilerHelper or some such class where all the low level helper function lives, so that MRCompiler itself is small and thus easier to read. The if statements should be broken up into multiple lines to agree with the style guides. Variable naming: you've got topPrj, prj, pkg, lr, ce, nig.. one at a time they are fine, but together in a 200-line method they are undreadable. Please consider more descriptive names. Will use more descriptive names in next patch. 6. Kind of a global comment, since it applies to more than just MergeJoin: It seems to me like we need a Builder for operators to clean up some of the new, set, set, set stuff. Having the setters return this and a Plan's add() method return the plan, would let us replace this: POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); topPrj.setColumn(1); topPrj.setResultType(DataType.TUPLE); topPrj.setOverloaded(true); rightMROpr.reducePlan.add(topPrj); rightMROpr.reducePlan.connect(pkg, topPrj); with this: POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))) .setColumn(1).setResultType(DataType.TUPLE) .setOverloaded(true); rightMROpr.reducePlan.add(topPrj).connect(pkg, topPrj) I agree. At many places there are too many parameters to set. Setters should be smart and should return the object instead of being void and then this idea of chaining will help to cut down the number of lines. 7. Is the change to ListListByte keyTypes in POFRJoin related to MergeJoin or just rolled in? POFRJoin can do without this change, but to avoid code duplication, I update the POFRJoin to use ListListByte keyTypes. 8. MergeJoin break getNext() into components. I dont want to do that because it already has lots of class members which are getting updated at various places. Making those variables live in multiple functions will make logic even more harder to follow. Also, I am not sure if java compiler can always inline the private methods. I don't see you supporting Left outer joins. Plans for that? At least document the planned approach. Ya, outer joins are currently not supported. Its documented in specification. Will include comment in code also. Error codes being declared deep inside classes, and documented on the wiki, is a poor practice, imo. They should be pulled out into PigErrors (as lightweight final objects that have an error code, a name, and a description..) I thought Santhosh made progress on this already, no? Not sure if I understand you completely. I am using ExecException, FrontEndException etc. Arent these are lightweight final objects you are referring to ? Could you explain the problem with splits and streams? Why can't this work for them? Streaming after the join will be supported. There was a bug which I fixed and will be a part of next patch. Streaming before Join will not be supported because in endOfAllInput case, streaming may potentially produce multiple tuples
[jira] Commented: (PIG-845) PERFORMANCE: Merge Join
[ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12741589#action_12741589 ] Dmitriy V. Ryaboy commented on PIG-845: --- Some Comments below. It's a big patch, so a lot of comments... 1. EndOfAllInput flags -- could you add comments here about what the point of this flag is? You explain what EndOfAllInputSetter does (which is actually rather self-explanatory) but not what the meaning of the flag is and how it's used. There is a bit of an explanation in PigMapBase, but it really belongs here. 2. Could you explain the relationship between EndOfAllInput and (deleted) POStream? 3. Comments in MRCompiler alternate between referring to the left MROp as LeftMROper and curMROper. Choose one. 4. I am curious about the decision to throw compiler exceptions if MergeJoin requirements re number of inputs, etc, aren't satisfied. It seems like a better user experience would be to log a warning and fall back to a regular join. 5. Style notes for visitMergeJoin: It's a 200-line method. Any way you can break it up into smaller components? As is, it's hard to follow. The if statements should be broken up into multiple lines to agree with the style guides. Variable naming: you've got topPrj, prj, pkg, lr, ce, nig.. one at a time they are fine, but together in a 200-line method they are undreadable. Please consider more descriptive names. 6. Kind of a global comment, since it applies to more than just MergeJoin: It seems to me like we need a Builder for operators to clean up some of the new, set, set, set stuff. Having the setters return this and a Plan's add() method return the plan, would let us replace this: POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); topPrj.setColumn(1); topPrj.setResultType(DataType.TUPLE); topPrj.setOverloaded(true); rightMROpr.reducePlan.add(topPrj); rightMROpr.reducePlan.connect(pkg, topPrj); with this: POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))) .setColumn(1).setResultType(DataType.TUPLE) .setOverloaded(true); rightMROpr.reducePlan.add(topPrj).connect(pkg, topPrj) 7. Is the change to ListListByte keyTypes in POFRJoin related to MergeJoin or just rolled in? 8. MergeJoin break getNext() into components. I don't see you supporting Left outer joins. Plans for that? At least document the planned approach. Error codes being declared deep inside classes, and documented on the wiki, is a poor practice, imo. They should be pulled out into PigErrors (as lightweight final objects that have an error code, a name, and a description..) I thought Santhosh made progress on this already, no? Could you explain the problem with splits and streams? Why can't this work for them? 9. Sampler/Indexer: 9a Looks like you create the same number of map tasks for this as you do for a join; all a sampling map task does is read one record and emit a single tuple. That seems wasteful; there is a lot of overhead in setting up these tiny jobs which might get stuck behind other jobs running on the cluster, etc. If the underlying file has syncpoints, a smaller number of MR tasks can be created. If we know the ratio of sample tasks to full tasks, we can figure out how many records we should emit per job ( ceil(full_tasks/sample_tasks) ). We can approximately achieve this through seeking trough (end-offset)/num_to_emit and doing a sync() after that seek. It's approximate, but close enough for an index. 9b Consider renaming to something like SortedFileIndexer, since it's coneivable that this component can be reused in a context other than a Merge Join. 10. Would it make sense to expose this to the users via a 'CREATE INDEX' (or similar) command? That way the index could be persisted, and the user could tell you to use an existing index instead of rescanning the data. 11. I am not sure about the approach of pushing sampling above filters. Have you guys benchmarked this? Seems like you'd wind up reading the whole file in the sample job if the filter is selective enough (and high filter selectivity would also make materialize-sample go much faster). Testing: 12a You should test for refusal to do 3-way join and other error condition (or a warning and successful failover to regular join -- my preference) 12b You should do a proper unit test for the MergeJoinIndexer (or whatever we are calling it). PERFORMANCE: Merge Join --- Key: PIG-845 URL: https://issues.apache.org/jira/browse/PIG-845 Project: Pig Issue Type: Improvement Reporter: Olga Natkovich Assignee: Ashutosh Chauhan Attachments: merge-join-1.patch, merge-join-for-review.patch Thsi join would work if the data for both tables is sorted on the join key. -- This message is
[jira] Commented: (PIG-845) PERFORMANCE: Merge Join
[ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12741621#action_12741621 ] Pradeep Kamath commented on PIG-845: Review comments: 1) In LogicalPlanTester.java, why is the following change required? {noformat} @@ -198,7 +198,7 @@ private LogicalPlan buildPlan(String query, ClassLoader cldr) { LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ; -PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties()); +PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties()); try { pigContext.connect(); } catch (ExecException e1) { {noformat} Typically when PigContext is constructed in Map-reduce mode, the properties should correspond to the cluster configuration. So the above initialization seems odd because the Properties object is an empty object in the constructor call above. 2) In PigMapBase.java: public static final String END_OF_INP_IN_MAP = pig.stream.in.map; can change to public static final String END_OF_INP_IN_MAP = pig.blocking.operator.in.map; and this should be put as a public static member of JobControlCompiler. In JobControlCompiler.java, jobConf.set(pig.stream.in.map, true); should change to use the above public static String. 3) Remove the following comment in QueryParser.jjt (line 302): {code} * Join parser. Currently can only handle skewed joins. {code} 4) In QueryParser.jjt the joinPlans passed to LOJoin constructor is not a LinkedMultiMap but in LogToPhyTranslationVistior the join plans are put in a LinkedMultiMap. If order is important, shouldn't QueryParser.jjt also change? 5) Some comments in LogToPhyTranslationVisitor about the different lists and maps would help :) 6) In validateMergeJoin() - the code only considers direct successors and predecessors of LOJoin. It should check the entire plan and ensure that predecessors of LOJoin all the way to the LOLoad are only LOForEach and LOFilter. Strictly we should not allow LOForeach since it could change sort order or position of join keys and hence invalidate the index - but we need it so that the Foreach introduced by the TypeCastInserter when there is a schema for either of the inputs remains. You should note in the documentation that only Order and join key position preserving Foreachs and Filters are allowed as predecessors to merge join and check the same in validateMergeJoin() - it is better to use a whitelist of allowed operators than a blacklist of disallowed once (since then the blacklist would need to be updated anytime a new operator comes along. The exception source here is not really a bug but a user input error since merge join really doesnot support other ops. Again for the successor, all successors from mergejoin down to map leaf should be checked to ensure stream is absent (really there should be no restriction on stream being present after the join - if there is an issue currently with this, it is fine to not allow stream but eventually it would be good to not have any restriction on what follows the merge join). You can just use a visitor to check presence of stream in the plan - this should be done after complete LogToPhyTranslation is done - in visit() so that the whole plan can be looked at. 7) Is MRStreamHandler.java now replaced by /org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java ? 8) Some of MRCompilerExceptions do not follow the Error handling spec - errcode, errMsg, Src 9) Should assert() statements in MRCompiler be replaced with Exceptions since assertions are disabled by default in Java. 10) In MRCompiler.java I wonder if you should change {code} rightMapPlan.disconnect(rightLoader, loadSucc); rightMapPlan.remove(loadSucc); {code} to {code} rightMapPlan.trimBelow(rightLoader); {code} We really want to remove all operators in rightMapPlan other than the loader. 11) We should note in documentation that merge join only works for data sorted in ascending order. (the MRCompiler code assumes this - we should have sort check if possible - see performance comment below) 12) It would be good to add a couple of unit tests with a few operators after merge join to ensure merge join operators well with successors in the plan. 13) In POMergeJoin.java, comments about foreach should be cleaned up since foreach is no longer used. For example: {code} //variable which denotes whether we are returning tuples from the foreach operator {code} The following code can be factored out into a function since its repeated twice: {code} case POStatus.STATUS_EOP: // Current file has ended. Need to open next file by reading next index entry. String prevFile =
[jira] Commented: (PIG-845) PERFORMANCE: Merge Join
[ https://issues.apache.org/jira/browse/PIG-845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12737377#action_12737377 ] Pradeep Kamath commented on PIG-845: Some initial comments on POMergeJoin.java: If status is not OK - it shuld just be returned (no run time exception like above) - similar comments for other places in POMergeJoin where there is a switch case on processInput() - once this change is made, the code in if(processingFE) also will need to change accordingly {code} if(firstTime){ // Do initial setup. curLeftInp = processInput(); switch(curLeftInp.returnStatus){ case POStatus.STATUS_OK: break; case POStatus.STATUS_EOP: // Return because we want to fetch next left tuple. return curLeftInp; default: throw new RuntimeException(Unexpected Status); } {code} All non RuntimeExceptions should follow error handling specification by using the correct Exception created with error code, cause, message, src constructor. http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#head-9f71d78d362c3307711f98ec9db3ee12b55e92f6 should be updated with new error code # detachInput() is not required in POMergeJoin - processInput takes care of it IN the code below, we could cache away the key to be used while processFE is true as processFEKey and then we need not extract key for each join // Cant use the prevLeftKey, because we are reading ahead. // Need key of current bag. Since we have just finished doing the join // bag must contain atleast one element. res.returnStatus = POStatus.STATUS_OK; res.result = leftTuples.get(0); curLeftKey = extractKeysFromTuple(res, 0); PERFORMANCE: Merge Join --- Key: PIG-845 URL: https://issues.apache.org/jira/browse/PIG-845 Project: Pig Issue Type: Improvement Reporter: Olga Natkovich Attachments: merge-join-for-review.patch Thsi join would work if the data for both tables is sorted on the join key. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.