Author: daijy
Date: Tue Dec 8 22:24:04 2009
New Revision: 888601
URL: http://svn.apache.org/viewvc?rev=888601&view=rev
Log:
PIG-1132: Column Pruner issues in dealing with unprunable loader
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Dec 8 22:24:04 2009
@@ -261,6 +261,8 @@
PIG-1133: UDFContext should be made available to LoadFunc.bindTo (daijy)
+PIG-1132: Column Pruner issues in dealing with unprunable loader (daijy)
+
Release 0.5.0
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java Tue
Dec 8 22:24:04 2009
@@ -37,11 +37,11 @@
private Map<LogicalOperator, List<Pair<Integer,Integer>>> prunedColumnsMap;
LogicalPlan plan;
- public ColumnPruner(LogicalPlan plan, LOLoad load, List<Pair<Integer,
Integer>> prunedColumns,
+ public ColumnPruner(LogicalPlan plan, LogicalOperator op,
List<Pair<Integer, Integer>> prunedColumns,
PlanWalker<LogicalOperator, LogicalPlan> walker) {
super(plan, walker);
prunedColumnsMap = new HashMap<LogicalOperator,
List<Pair<Integer,Integer>>>();
- prunedColumnsMap.put(load, prunedColumns);
+ prunedColumnsMap.put(op, prunedColumns);
this.plan = plan;
}
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
Tue Dec 8 22:24:04 2009
@@ -59,7 +59,9 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.RequiredFields;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
class RequiredInfo {
@@ -696,6 +698,7 @@
}
// Loader does not support column pruning, insert foreach
+ LOForEach forEach = null;
if (response==null || !response.getRequiredFieldResponse())
{
Set<Integer> columnsToProject = new TreeSet<Integer>();
@@ -713,71 +716,86 @@
projectPlan.add(column);
generatePlans.add(projectPlan);
}
- LOForEach forEach = new LOForEach(mPlan, new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans,
flattenList);
+ forEach = new LOForEach(mPlan, new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans,
flattenList);
LogicalOperator pred = mPlan.getSuccessors(load).get(0);
+ /*mPlan.disconnect(load, pred);
mPlan.add(forEach);
- mPlan.insertBetween(load, forEach, pred);
- String message = "Cannot prune " + load.getAlias() + ", " +
load.getLoadFunc().getClass().getSimpleName() + " does not support pruning, add
foreach";
- log.info(message);
+ mPlan.connect(load, forEach);
+ mPlan.connect(forEach, pred);
+ forEach.getSchema();*/
+ MultiMap<Integer, Column> mappedFields = new MultiMap<Integer,
Column>();
+ List<Column> columns;
+ for (int i=0;i<=load.getSchema().size();i++) {
+ columns = new ArrayList<Column>();
+ columns.add(new Column(new Pair<Integer, Integer>(0, i)));
+ mappedFields.put(i, columns);
+ }
+ mPlan.add(forEach);
+ mPlan.doInsertBetween(load, forEach, pred, false);
+ forEach.getProjectionMap().setMappedFields(mappedFields);
+ pred.rewire(load, 0, forEach, false);
}
- // We get positive response, begin to prune
- if (response!=null && response.getRequiredFieldResponse())
+ // Begin to prune
+ for (Pair<Integer, Integer> pair: loaderRequiredFields.getFields())
+ columnRequired[pair.second] = true;
+
+ List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer,
Integer>>();
+ for (int i=0;i<columnRequired.length;i++)
{
- for (Pair<Integer, Integer> pair: loaderRequiredFields.getFields())
- columnRequired[pair.second] = true;
-
- List<Pair<Integer, Integer>> pruneList = new
ArrayList<Pair<Integer, Integer>>();
- for (int i=0;i<columnRequired.length;i++)
- {
- if (!columnRequired[i])
- pruneList.add(new Pair<Integer, Integer>(0, i));
- }
+ if (!columnRequired[i])
+ pruneList.add(new Pair<Integer, Integer>(0, i));
+ }
- StringBuffer message = new StringBuffer();
- if (pruneList.size()!=0)
- {
- ColumnPruner columnPruner = new ColumnPruner(mPlan, load,
pruneList,
+ StringBuffer message = new StringBuffer();
+ if (pruneList.size()!=0)
+ {
+
+ ColumnPruner columnPruner;
+ if (forEach == null)
+ columnPruner = new ColumnPruner(mPlan, load, pruneList,
+ new DependencyOrderWalker<LogicalOperator,
LogicalPlan>(mPlan));
+ else
+ columnPruner = new ColumnPruner(mPlan, forEach, pruneList,
new DependencyOrderWalker<LogicalOperator,
LogicalPlan>(mPlan));
-
- columnPruner.visit();
+
+ columnPruner.visit();
- message.append("Columns pruned for " + load.getAlias() + ": ");
- for (int i=0;i<pruneList.size();i++)
- {
- message.append("$"+pruneList.get(i).second);
- if (i!=pruneList.size()-1)
- message.append(", ");
- }
- log.info(message);
+ message.append("Columns pruned for " + load.getAlias() + ": ");
+ for (int i=0;i<pruneList.size();i++)
+ {
+ message.append("$"+pruneList.get(i).second);
+ if (i!=pruneList.size()-1)
+ message.append(", ");
}
- else
- log.info("No column pruned for " + load.getAlias());
- message = new StringBuffer();;
- for (LoadFunc.RequiredField rf : requiredFieldList.getFields())
+ log.info(message);
+ }
+ else
+ log.info("No column pruned for " + load.getAlias());
+ message = new StringBuffer();;
+ for (LoadFunc.RequiredField rf : requiredFieldList.getFields())
+ {
+ if (rf.getSubFields()!=null)
{
- if (rf.getSubFields()!=null)
+ message.append("Map key required for " + load.getAlias()+": ");
+ if (rf.getIndex()!=-1)
+ message.append("$"+rf.getIndex());
+ else
+ message.append(rf.getAlias());
+ message.append("->[");
+ for (int i=0;i<rf.getSubFields().size();i++)
{
- message.append("Map key required for " +
load.getAlias()+": ");
- if (rf.getIndex()!=-1)
- message.append("$"+rf.getIndex());
- else
- message.append(rf.getAlias());
- message.append("->[");
- for (int i=0;i<rf.getSubFields().size();i++)
- {
- LoadFunc.RequiredField keyrf =
rf.getSubFields().get(i);
- message.append(keyrf);
- if (i!=rf.getSubFields().size()-1)
- message.append(",");
- }
- message.append("] ");
+ LoadFunc.RequiredField keyrf = rf.getSubFields().get(i);
+ message.append(keyrf);
+ if (i!=rf.getSubFields().size()-1)
+ message.append(",");
}
+ message.append("] ");
}
- if (message.length()!=0)
- log.info(message);
- else
- log.info("No map keys pruned for " + load.getAlias());
}
+ if (message.length()!=0)
+ log.info(message);
+ else
+ log.info("No map keys pruned for " + load.getAlias());
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=888601&r1=888600&r2=888601&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Tue Dec 8
22:24:04 2009
@@ -1280,13 +1280,12 @@
}
@Test
- public void testBinStorage() throws Exception {
+ public void testBinStorage1() throws Exception {
File intermediateFile = File.createTempFile("intemediate", "txt");
intermediateFile.delete();
intermediateFile.mkdirs();
pigServer.registerQuery("A = load '"+
Util.generateURI(tmpFile1.toString()) + "' as (a0, a1, a2);");
pigServer.store("A", intermediateFile.toString(), "BinStorage()");
- intermediateFile.delete();
pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+ "' using BinStorage() as (a0, a1, a2);");
@@ -1307,8 +1306,48 @@
assertFalse(iter.hasNext());
- assertTrue(checkLogFileMessage(new String[]{"Cannot prune A,
BinStorage does not support pruning, add foreach"}));
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1,
$2",
+ "No map keys pruned for A"}));
+
+ intermediateFile.delete();
+ }
+
+ @Test
+ public void testBinStorage2() throws Exception {
+ File intermediateFile = File.createTempFile("intemediate", "txt");
+ intermediateFile.delete();
+ intermediateFile.mkdirs();
+ pigServer.registerQuery("A = load '"+
Util.generateURI(tmpFile1.toString()) + "' as (a0, a1, a2);");
+ pigServer.store("A", intermediateFile.toString(), "BinStorage()");
+
+ pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+ + "' using BinStorage() as (a0, a1, a2);");
+
+ pigServer.registerQuery("B = foreach A generate a2, a0, a1;");
+ pigServer.registerQuery("C = foreach B generate a0, a2;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+ assertTrue(t.size()==2);
+ assertTrue(t.get(0).toString().equals("1"));
+ assertTrue(t.get(1).toString().equals("3"));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.size()==2);
+ assertTrue(t.get(0).toString().equals("2"));
+ assertTrue(t.get(0).toString().equals("2"));
+
+ assertFalse(iter.hasNext());
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A:
$1",
+ "No map keys pruned for A"}));
+
+ intermediateFile.delete();
}
+
@Test
public void testProjectCastKeyLookup() throws Exception {