[ https://issues.apache.org/jira/browse/PIG-1038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12775214#action_12775214 ]
Pradeep Kamath commented on PIG-1038: ------------------------------------- Review comments: In JobControlCompiler: ====================== The OutputValueGroupingComparator is a RawComparator - how are we ensuring that compare(WritableComparable a, WritableComparable b) is called? {code} 622 if ((wa.getIndex() & PigNullableWritable.mqFlag) != 0) { // this is a multi-query index {code} Why do we only compare on index if this is true? The if-else in this block does not consider the case where both indices are same - is that by design? {code} 653 } else if (wa.isNull() && wb.isNull()) { {code} In this block the case where both indices are same is not considered - is that by design? The change in src/org/apache/pig/impl/io/PigNullableWritable.java seems unrelated to the patch In SecondaryKeyOptimizer.java: ============================== {code} 154 else if (mapLeaf instanceof POUnion || mapLeaf instanceof POSplit) { 155 List<PhysicalOperator> preds = mr.mapPlan.getPredecessors(mapLeaf); 156 for (PhysicalOperator pred:preds) 157 { 158 if (pred instanceof POLocalRearrange) 159 { 160 SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange)pred); 161 sortKeyInfos.add(sortKeyInfo); 162 } 163 } {code} If mapLeaf is a POSplit, the POSplit may have POLocalRearrange as the leaf (in multi query optimized queries) - should we be handling those? Also, getSortKeyInfo() can return a null - so all places where getSortKeyInfo() is called, return value should be checked for null {code} 98 List<Integer> columns = new ArrayList<Integer>(); 99 columns.add(rearrange.getIndex()&PigNullableWritable.idxSpace); 100 columnChainInfo.insert(false, columns, DataType.TUPLE) {code} Why does the column chain start with the index of LocalRearrange and why is the type tuple? {code} 102 PhysicalOperator node = plan.getRoots().get(0); 103 while (node!=null) 104 { 105 if (node instanceof POProject) { 106 POProject project = (POProject)node; 107 108 columnChainInfo.insert(project.isStar(), project.getColumns(), project.getResultType()); 109 110 if (plan.getPredecessors(node)==null) 111 node = null; {code} If node is initially the root, wouldn't plan.getPredecessors(node) always == null? {code} 175 List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots(); 176 if (reduceRoots.size() != 1) { 177 log.debug("Expected reduce to have single leaf"); 178 return; 179 } {code} Did you mean to say "Expected reduce to have single root" ? {code} 209 // Removed POSort, if the predecessor require a databag, we need to add a PORelationToExprProject {code} Should the above comment read "if the successor requires ..." {code} 247 throw new VisitorException("Sort on columns from different inputs."); {code} Should this exception follow Error Handling guidelines to include errorcode, and error source? {code} 253 } else if (mapLeaf instanceof POUnion || mapLeaf instanceof POSplit){ 254 List<PhysicalOperator> preds = mr.mapPlan.getPredecessors(mapLeaf); 255 for (PhysicalOperator pred:preds) { 256 POLocalRearrange rearrange = (POLocalRearrange)pred; 257 rearrange.setUseSecondaryKey(true); 258 if (rearrange.getIndex()==indexOfRearrangeToChange) 259 setSecondaryPlan(mr.mapPlan, rearrange, secondarySortKeyInfo); 260 } 261 } {code} If mapLeaf is a POSplit, the POSplit may have POLocalRearrange as the leaf (in multi query optimized queries) - should we be handling those? Also in the if statement on line 258, what if the condition evaluates to false - shouwl we throw an Exception like earlier in the same method? {code} 274 for (int i=1;i<columnChainInfo.size();i++) { {code} A comment explaining why this loop starts from 1 would be useful {code} 286 if (secondaryPlan.isEmpty()) 287 { {code} A comment explaining when this would happen would be useful There is ForEachVisitor which visits *all* ForEach ops in the reduce plan - is a visitor needed? Shouldn't only the foreach following the POPackage be processed? 372 SecondaryKeyDiscoverVisitor(PhysicalPlan plan, List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) { It seems like the secondarySortKeyInfo passed in the constructor call is always null - is that argument needed in the constructor? 431 throw new VisitorException("POForEach has more than 1 input plans"); Should this exception follow Error Handling guidelines to include errorcode, and error source? A test case should be added for the case of a non Project group by key like group by $0 + $1 - I did not follow the code path for this case - we should ensure this works with a nested sort in the foreach. In ColumnChainInfo.java: ========================= A comment on ColumnChainInfo and SortKeyInfo explaining how it tracks to POProjects in the plan would be useful {code} 42 ColumnInfo newColumnChainInfo = new ColumnInfo(star, columns, type); {code} I think it is better to name this variable as newColumnInfo since it the name 'newColumnChainInfo' seems to suggest the object is of type ColumnChainInfo. 52 newColumns.add(columns.get(0)-1); The above code suggests that we rely on columns only having 1 element - at position 0. If that is not true, should the code handle the case where columns has > 1 element? 53 ColumnInfo newColumnChainInfo = new ColumnInfo(star, newColumns, type); I think it is better to name this variable as newColumnInfo since it the name 'newColumnChainInfo' seems to suggest the object is of type ColumnChainInfo. {code} 85 public void normalize() { 86 int i=size()-1; 87 while (i>=0 && getColumnInfo(i).star) { 88 columnInfos.remove(i); 89 i--; 90 } 91 } {code} What is the above method used for? In SortKeyInfo.java: ===================== {code} public void insertColumnInfo(int index, ColumnChainInfo columnChainInfo) { if (columnChains.size()<=index) { columnChains.add(columnChainInfo); return; } ColumnChainInfo chain = columnChains.get(index); chain.insertColumnChainInfo(columnChainInfo); } {code} Should this method be named "insertColumnChainInfo" instead? When would the if condition evaluate to false? {code} public void normalize() { if (columnChains.size()==1) columnChains.get(0).normalize(); } {code} What does the above do? In POSortedDistinct.java: ========================= {code} 75 if (in.returnStatus == POStatus.STATUS_EOP) { 76 if (!isAccumulative() || !isAccumStarted()) { 77 lastTuple = null; 78 } 79 return in; 80 } {code} what do the checks for isAccumulative() and is AccumStarted do? In POLocalRearrange.java: ========================= Is there a way to refactor the code so that setPlans() and setSecondayPlans() can share common code? In POMultiQueryPackage.java: ============================ In setUseSecondaryKey(), the code, sets the flag to true in all underlying packages irrespective of whether the argument to the method is true or false. Also I am wondering if in multiquery optimization, some subqueries need secondary key and some don't, is that use case handled? > Optimize nested distinct/sort to use secondary key > -------------------------------------------------- > > Key: PIG-1038 > URL: https://issues.apache.org/jira/browse/PIG-1038 > Project: Pig > Issue Type: Improvement > Components: impl > Affects Versions: 0.4.0 > Reporter: Olga Natkovich > Assignee: Daniel Dai > Fix For: 0.6.0 > > Attachments: PIG-1038-1.patch, PIG-1038-2.patch > > > If nested foreach plan contains sort/distinct, it is possible to use hadoop > secondary sort instead of SortedDataBag and DistinctDataBag to optimize the > query. > Eg1: > A = load 'mydata'; > B = group A by $0; > C = foreach B { > D = order A by $1; > generate group, D; > } > store C into 'myresult'; > We can specify a secondary sort on A.$1, and drop "order A by $1". > Eg2: > A = load 'mydata'; > B = group A by $0; > C = foreach B { > D = A.$1; > E = distinct D; > generate group, E; > } > store C into 'myresult'; > We can specify a secondary sort key on A.$1, and simplify "D=A.$1; E=distinct > D" to a special version of distinct, which does not do the sorting. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.