Author: daijy Date: Tue Dec 8 22:24:04 2009 New Revision: 888601 URL: http://svn.apache.org/viewvc?rev=888601&view=rev Log: PIG-1132: Column Pruner issues in dealing with unprunable loader
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=888601&r1=888600&r2=888601&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Dec 8 22:24:04 2009 @@ -261,6 +261,8 @@ PIG-1133: UDFContext should be made available to LoadFunc.bindTo (daijy) +PIG-1132: Column Pruner issues in dealing with unprunable loader (daijy) + Release 0.5.0 INCOMPATIBLE CHANGES Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java?rev=888601&r1=888600&r2=888601&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java Tue Dec 8 22:24:04 2009 @@ -37,11 +37,11 @@ private Map<LogicalOperator, List<Pair<Integer,Integer>>> prunedColumnsMap; LogicalPlan plan; - public ColumnPruner(LogicalPlan plan, LOLoad load, List<Pair<Integer, Integer>> prunedColumns, + public ColumnPruner(LogicalPlan plan, LogicalOperator op, List<Pair<Integer, Integer>> prunedColumns, PlanWalker<LogicalOperator, LogicalPlan> walker) { super(plan, walker); prunedColumnsMap = new HashMap<LogicalOperator, List<Pair<Integer,Integer>>>(); - prunedColumnsMap.put(load, prunedColumns); + prunedColumnsMap.put(op, prunedColumns); this.plan = plan; } Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=888601&r1=888600&r2=888601&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java Tue Dec 8 22:24:04 2009 @@ -59,7 +59,9 @@ import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.RequiredFields; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.plan.ProjectionMap.Column; import org.apache.pig.impl.plan.optimizer.OptimizerException; +import org.apache.pig.impl.util.MultiMap; import org.apache.pig.impl.util.Pair; class RequiredInfo { @@ -696,6 +698,7 @@ } // Loader does not support column pruning, insert foreach + LOForEach forEach = null; if (response==null || !response.getRequiredFieldResponse()) { Set<Integer> columnsToProject = new TreeSet<Integer>(); @@ -713,71 +716,86 @@ projectPlan.add(column); generatePlans.add(projectPlan); } - LOForEach forEach = new LOForEach(mPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, flattenList); + forEach = new LOForEach(mPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, flattenList); LogicalOperator pred = mPlan.getSuccessors(load).get(0); + /*mPlan.disconnect(load, pred); mPlan.add(forEach); - mPlan.insertBetween(load, forEach, pred); - String message = "Cannot prune " + load.getAlias() + ", " + load.getLoadFunc().getClass().getSimpleName() + " does not support pruning, add foreach"; - log.info(message); + mPlan.connect(load, forEach); + mPlan.connect(forEach, pred); + forEach.getSchema();*/ + MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, Column>(); + List<Column> columns; + for (int i=0;i<=load.getSchema().size();i++) { + columns = new ArrayList<Column>(); + columns.add(new Column(new Pair<Integer, Integer>(0, i))); + mappedFields.put(i, columns); + } + mPlan.add(forEach); + mPlan.doInsertBetween(load, forEach, pred, false); + forEach.getProjectionMap().setMappedFields(mappedFields); + pred.rewire(load, 0, forEach, false); } - // We get positive response, begin to prune - if (response!=null && response.getRequiredFieldResponse()) + // Begin to prune + for (Pair<Integer, Integer> pair: loaderRequiredFields.getFields()) + columnRequired[pair.second] = true; + + List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer, Integer>>(); + for (int i=0;i<columnRequired.length;i++) { - for (Pair<Integer, Integer> pair: loaderRequiredFields.getFields()) - columnRequired[pair.second] = true; - - List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer, Integer>>(); - for (int i=0;i<columnRequired.length;i++) - { - if (!columnRequired[i]) - pruneList.add(new Pair<Integer, Integer>(0, i)); - } + if (!columnRequired[i]) + pruneList.add(new Pair<Integer, Integer>(0, i)); + } - StringBuffer message = new StringBuffer(); - if (pruneList.size()!=0) - { - ColumnPruner columnPruner = new ColumnPruner(mPlan, load, pruneList, + StringBuffer message = new StringBuffer(); + if (pruneList.size()!=0) + { + + ColumnPruner columnPruner; + if (forEach == null) + columnPruner = new ColumnPruner(mPlan, load, pruneList, + new DependencyOrderWalker<LogicalOperator, LogicalPlan>(mPlan)); + else + columnPruner = new ColumnPruner(mPlan, forEach, pruneList, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(mPlan)); - - columnPruner.visit(); + + columnPruner.visit(); - message.append("Columns pruned for " + load.getAlias() + ": "); - for (int i=0;i<pruneList.size();i++) - { - message.append("$"+pruneList.get(i).second); - if (i!=pruneList.size()-1) - message.append(", "); - } - log.info(message); + message.append("Columns pruned for " + load.getAlias() + ": "); + for (int i=0;i<pruneList.size();i++) + { + message.append("$"+pruneList.get(i).second); + if (i!=pruneList.size()-1) + message.append(", "); } - else - log.info("No column pruned for " + load.getAlias()); - message = new StringBuffer();; - for (LoadFunc.RequiredField rf : requiredFieldList.getFields()) + log.info(message); + } + else + log.info("No column pruned for " + load.getAlias()); + message = new StringBuffer();; + for (LoadFunc.RequiredField rf : requiredFieldList.getFields()) + { + if (rf.getSubFields()!=null) { - if (rf.getSubFields()!=null) + message.append("Map key required for " + load.getAlias()+": "); + if (rf.getIndex()!=-1) + message.append("$"+rf.getIndex()); + else + message.append(rf.getAlias()); + message.append("->["); + for (int i=0;i<rf.getSubFields().size();i++) { - message.append("Map key required for " + load.getAlias()+": "); - if (rf.getIndex()!=-1) - message.append("$"+rf.getIndex()); - else - message.append(rf.getAlias()); - message.append("->["); - for (int i=0;i<rf.getSubFields().size();i++) - { - LoadFunc.RequiredField keyrf = rf.getSubFields().get(i); - message.append(keyrf); - if (i!=rf.getSubFields().size()-1) - message.append(","); - } - message.append("] "); + LoadFunc.RequiredField keyrf = rf.getSubFields().get(i); + message.append(keyrf); + if (i!=rf.getSubFields().size()-1) + message.append(","); } + message.append("] "); } - if (message.length()!=0) - log.info(message); - else - log.info("No map keys pruned for " + load.getAlias()); } + if (message.length()!=0) + log.info(message); + else + log.info("No map keys pruned for " + load.getAlias()); } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=888601&r1=888600&r2=888601&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Tue Dec 8 22:24:04 2009 @@ -1280,13 +1280,12 @@ } @Test - public void testBinStorage() throws Exception { + public void testBinStorage1() throws Exception { File intermediateFile = File.createTempFile("intemediate", "txt"); intermediateFile.delete(); intermediateFile.mkdirs(); pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString()) + "' as (a0, a1, a2);"); pigServer.store("A", intermediateFile.toString(), "BinStorage()"); - intermediateFile.delete(); pigServer.registerQuery("A = load '"+ intermediateFile.toString() + "' using BinStorage() as (a0, a1, a2);"); @@ -1307,8 +1306,48 @@ assertFalse(iter.hasNext()); - assertTrue(checkLogFileMessage(new String[]{"Cannot prune A, BinStorage does not support pruning, add foreach"})); + assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2", + "No map keys pruned for A"})); + + intermediateFile.delete(); + } + + @Test + public void testBinStorage2() throws Exception { + File intermediateFile = File.createTempFile("intemediate", "txt"); + intermediateFile.delete(); + intermediateFile.mkdirs(); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString()) + "' as (a0, a1, a2);"); + pigServer.store("A", intermediateFile.toString(), "BinStorage()"); + + pigServer.registerQuery("A = load '"+ intermediateFile.toString() + + "' using BinStorage() as (a0, a1, a2);"); + + pigServer.registerQuery("B = foreach A generate a2, a0, a1;"); + pigServer.registerQuery("C = foreach B generate a0, a2;"); + + Iterator<Tuple> iter = pigServer.openIterator("C"); + + assertTrue(iter.hasNext()); + Tuple t = iter.next(); + assertTrue(t.size()==2); + assertTrue(t.get(0).toString().equals("1")); + assertTrue(t.get(1).toString().equals("3")); + + assertTrue(iter.hasNext()); + t = iter.next(); + assertTrue(t.size()==2); + assertTrue(t.get(0).toString().equals("2")); + assertTrue(t.get(0).toString().equals("2")); + + assertFalse(iter.hasNext()); + + assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1", + "No map keys pruned for A"})); + + intermediateFile.delete(); } + @Test public void testProjectCastKeyLookup() throws Exception {