Author: daijy
Date: Tue Dec  8 22:24:04 2009
New Revision: 888601

URL: http://svn.apache.org/viewvc?rev=888601&view=rev
Log:
PIG-1132: Column Pruner issues in dealing with unprunable loader

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Dec  8 22:24:04 2009
@@ -261,6 +261,8 @@
 
 PIG-1133: UDFContext should be made available to LoadFunc.bindTo (daijy)
 
+PIG-1132: Column Pruner issues in dealing with unprunable loader (daijy)
+
 Release 0.5.0
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java Tue 
Dec  8 22:24:04 2009
@@ -37,11 +37,11 @@
     private Map<LogicalOperator, List<Pair<Integer,Integer>>> prunedColumnsMap;
     LogicalPlan plan;
     
-    public ColumnPruner(LogicalPlan plan, LOLoad load, List<Pair<Integer, 
Integer>> prunedColumns, 
+    public ColumnPruner(LogicalPlan plan, LogicalOperator op, 
List<Pair<Integer, Integer>> prunedColumns, 
             PlanWalker<LogicalOperator, LogicalPlan> walker) {
         super(plan, walker);
         prunedColumnsMap = new HashMap<LogicalOperator, 
List<Pair<Integer,Integer>>>();
-        prunedColumnsMap.put(load, prunedColumns);
+        prunedColumnsMap.put(op, prunedColumns);
         this.plan = plan;
     }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
 Tue Dec  8 22:24:04 2009
@@ -59,7 +59,9 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
 
 class RequiredInfo {
@@ -696,6 +698,7 @@
         }
         
         // Loader does not support column pruning, insert foreach
+        LOForEach forEach = null;
         if (response==null || !response.getRequiredFieldResponse())
         {
             Set<Integer> columnsToProject = new TreeSet<Integer>();
@@ -713,71 +716,86 @@
                 projectPlan.add(column);
                 generatePlans.add(projectPlan);
             }
-            LOForEach forEach = new LOForEach(mPlan, new OperatorKey(scope, 
NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, 
flattenList);
+            forEach = new LOForEach(mPlan, new OperatorKey(scope, 
NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, 
flattenList);
             LogicalOperator pred = mPlan.getSuccessors(load).get(0);
+            /*mPlan.disconnect(load, pred);
             mPlan.add(forEach);
-            mPlan.insertBetween(load, forEach, pred);
-            String message = "Cannot prune " + load.getAlias() + ", " + 
load.getLoadFunc().getClass().getSimpleName() + " does not support pruning, add 
foreach";
-            log.info(message);
+            mPlan.connect(load, forEach);
+            mPlan.connect(forEach, pred);
+            forEach.getSchema();*/
+            MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, 
Column>();
+            List<Column> columns;
+            for (int i=0;i<=load.getSchema().size();i++) {
+                columns = new ArrayList<Column>();
+                columns.add(new Column(new Pair<Integer, Integer>(0, i)));
+                mappedFields.put(i, columns);
+            }
+            mPlan.add(forEach);
+            mPlan.doInsertBetween(load, forEach, pred, false);
+            forEach.getProjectionMap().setMappedFields(mappedFields);
+            pred.rewire(load, 0, forEach, false);
         }
         
-        // We get positive response, begin to prune
-        if (response!=null && response.getRequiredFieldResponse())
+        // Begin to prune
+        for (Pair<Integer, Integer> pair: loaderRequiredFields.getFields())
+            columnRequired[pair.second] = true;
+        
+        List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer, 
Integer>>();
+        for (int i=0;i<columnRequired.length;i++)
         {
-            for (Pair<Integer, Integer> pair: loaderRequiredFields.getFields())
-                columnRequired[pair.second] = true;
-            
-            List<Pair<Integer, Integer>> pruneList = new 
ArrayList<Pair<Integer, Integer>>();
-            for (int i=0;i<columnRequired.length;i++)
-            {
-                if (!columnRequired[i])
-                    pruneList.add(new Pair<Integer, Integer>(0, i));
-            }
+            if (!columnRequired[i])
+                pruneList.add(new Pair<Integer, Integer>(0, i));
+        }
 
-            StringBuffer message = new StringBuffer();
-            if (pruneList.size()!=0)
-            {
-                ColumnPruner columnPruner = new ColumnPruner(mPlan, load, 
pruneList, 
+        StringBuffer message = new StringBuffer();
+        if (pruneList.size()!=0)
+        {
+            
+            ColumnPruner columnPruner;
+            if (forEach == null)
+                columnPruner = new ColumnPruner(mPlan, load, pruneList, 
+                    new DependencyOrderWalker<LogicalOperator, 
LogicalPlan>(mPlan));
+            else
+                columnPruner = new ColumnPruner(mPlan, forEach, pruneList, 
                         new DependencyOrderWalker<LogicalOperator, 
LogicalPlan>(mPlan));
-                
-                columnPruner.visit();
+            
+            columnPruner.visit();
 
-                message.append("Columns pruned for " + load.getAlias() + ": ");
-                for (int i=0;i<pruneList.size();i++)
-                {
-                    message.append("$"+pruneList.get(i).second);
-                    if (i!=pruneList.size()-1)
-                        message.append(", ");
-                }
-                log.info(message);
+            message.append("Columns pruned for " + load.getAlias() + ": ");
+            for (int i=0;i<pruneList.size();i++)
+            {
+                message.append("$"+pruneList.get(i).second);
+                if (i!=pruneList.size()-1)
+                    message.append(", ");
             }
-            else
-                log.info("No column pruned for " + load.getAlias());
-            message = new StringBuffer();;
-            for (LoadFunc.RequiredField rf : requiredFieldList.getFields())
+            log.info(message);
+        }
+        else
+            log.info("No column pruned for " + load.getAlias());
+        message = new StringBuffer();;
+        for (LoadFunc.RequiredField rf : requiredFieldList.getFields())
+        {
+            if (rf.getSubFields()!=null)
             {
-                if (rf.getSubFields()!=null)
+                message.append("Map key required for " + load.getAlias()+": ");
+                if (rf.getIndex()!=-1)
+                    message.append("$"+rf.getIndex());
+                else
+                    message.append(rf.getAlias());
+                message.append("->[");
+                for (int i=0;i<rf.getSubFields().size();i++)
                 {
-                    message.append("Map key required for " + 
load.getAlias()+": ");
-                    if (rf.getIndex()!=-1)
-                        message.append("$"+rf.getIndex());
-                    else
-                        message.append(rf.getAlias());
-                    message.append("->[");
-                    for (int i=0;i<rf.getSubFields().size();i++)
-                    {
-                        LoadFunc.RequiredField keyrf = 
rf.getSubFields().get(i);
-                        message.append(keyrf);
-                        if (i!=rf.getSubFields().size()-1)
-                            message.append(",");
-                    }
-                    message.append("] ");
+                    LoadFunc.RequiredField keyrf = rf.getSubFields().get(i);
+                    message.append(keyrf);
+                    if (i!=rf.getSubFields().size()-1)
+                        message.append(",");
                 }
+                message.append("] ");
             }
-            if (message.length()!=0)
-                log.info(message);
-            else
-                log.info("No map keys pruned for " + load.getAlias());
         }
+        if (message.length()!=0)
+            log.info(message);
+        else
+            log.info("No map keys pruned for " + load.getAlias());
     }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Tue Dec  8 
22:24:04 2009
@@ -1280,13 +1280,12 @@
     }
     
     @Test
-    public void testBinStorage() throws Exception {
+    public void testBinStorage1() throws Exception {
         File intermediateFile = File.createTempFile("intemediate", "txt");
         intermediateFile.delete();
         intermediateFile.mkdirs();
         pigServer.registerQuery("A = load '"+ 
Util.generateURI(tmpFile1.toString()) + "' as (a0, a1, a2);");
         pigServer.store("A", intermediateFile.toString(), "BinStorage()");
-        intermediateFile.delete();
         
         pigServer.registerQuery("A = load '"+ intermediateFile.toString() 
                 + "' using BinStorage() as (a0, a1, a2);");
@@ -1307,8 +1306,48 @@
         
         assertFalse(iter.hasNext());
         
-        assertTrue(checkLogFileMessage(new String[]{"Cannot prune A, 
BinStorage does not support pruning, add foreach"}));
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, 
$2", 
+            "No map keys pruned for A"}));
+        
+        intermediateFile.delete();
+    }
+    
+    @Test
+    public void testBinStorage2() throws Exception {
+        File intermediateFile = File.createTempFile("intemediate", "txt");
+        intermediateFile.delete();
+        intermediateFile.mkdirs();
+        pigServer.registerQuery("A = load '"+ 
Util.generateURI(tmpFile1.toString()) + "' as (a0, a1, a2);");
+        pigServer.store("A", intermediateFile.toString(), "BinStorage()");
+        
+        pigServer.registerQuery("A = load '"+ intermediateFile.toString() 
+                + "' using BinStorage() as (a0, a1, a2);");
+        
+        pigServer.registerQuery("B = foreach A generate a2, a0, a1;");
+        pigServer.registerQuery("C = foreach B generate a0, a2;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("1"));
+        assertTrue(t.get(1).toString().equals("3"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.size()==2);
+        assertTrue(t.get(0).toString().equals("2"));
+        assertTrue(t.get(0).toString().equals("2"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: 
$1", 
+            "No map keys pruned for A"}));
+        
+        intermediateFile.delete();
     }
+
     
     @Test
     public void testProjectCastKeyLookup() throws Exception {


Reply via email to