Author: daijy
Date: Wed Aug 19 07:01:15 2009
New Revision: 805684
URL: http://svn.apache.org/viewvc?rev=805684&view=rev
Log:
PIG-925: Fix join in local mode
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=805684&r1=805683&r2=805684&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Aug 19 07:01:15 2009
@@ -54,6 +54,8 @@
BUG FIXES
+ PIG-925: Fix join in local mode (daijy)
+
PIG-913: Error in Pig script when grouping on chararray column (daijy)
PIG-907: Provide multiple version of HashFNV (Piggybank) (daijy)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=805684&r1=805683&r2=805684&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
Wed Aug 19 07:01:15 2009
@@ -28,7 +28,9 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
@@ -41,6 +43,7 @@
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
import org.apache.pig.impl.logicalLayer.LOStore;
@@ -48,6 +51,7 @@
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DependencyOrderWalkerWOSeenChk;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
@@ -132,6 +136,106 @@
}
@Override
+ public void visit(LOJoin join) throws VisitorException {
+ String scope = join.getOperatorKey().scope;
+ List<LogicalOperator> inputs = join.getInputs();
+
+ // In local mode, LOJoin is achieved by POCogroup followed by a
POForEach with flatten
+ // Insert a POCogroup in the place of LOJoin
+ POCogroup poc = new POCogroup(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)), join.getRequestedParallelism());
+ boolean innerArray[] = new boolean[join.getInputs().size()];
+ for (int i=0;i<join.getInputs().size();i++)
+ innerArray[i] = true;
+ poc.setInner(innerArray);
+
+ currentPlan.add(poc);
+
+ // Add innner plans to POCogroup
+ int count = 0;
+ Byte type = null;
+ for(LogicalOperator lo : inputs) {
+ List<LogicalPlan> plans = (List<LogicalPlan>)
join.getJoinPlans().get(lo);
+
+ POLocalRearrangeForIllustrate physOp = new
POLocalRearrangeForIllustrate(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)), join
+ .getRequestedParallelism());
+ List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+ currentPlans.push(currentPlan);
+ for (LogicalPlan lp : plans) {
+ currentPlan = new PhysicalPlan();
+ PlanWalker<LogicalOperator, LogicalPlan> childWalker =
mCurrentWalker
+ .spawnChildWalker(lp);
+ pushWalker(childWalker);
+ mCurrentWalker.walk(this);
+ exprPlans.add(currentPlan);
+ popWalker();
+
+ }
+ currentPlan = currentPlans.pop();
+ try {
+ physOp.setPlans(exprPlans);
+ } catch (PlanException pe) {
+ throw new VisitorException(pe);
+ }
+ try {
+ physOp.setIndex(count++);
+ } catch (ExecException e1) {
+ throw new VisitorException(e1);
+ }
+ if (plans.size() > 1) {
+ type = DataType.TUPLE;
+ physOp.setKeyType(type);
+ } else {
+ type = exprPlans.get(0).getLeaves().get(0).getResultType();
+ physOp.setKeyType(type);
+ }
+ physOp.setResultType(DataType.TUPLE);
+
+ currentPlan.add(physOp);
+
+ try {
+ currentPlan.connect(LogToPhyMap.get(lo), physOp);
+ currentPlan.connect(physOp, poc);
+ } catch (PlanException e) {
+ log.error("Invalid physical operators in the physical plan"
+ + e.getMessage());
+ throw new VisitorException(e);
+ }
+
+ }
+
+ // Append POForEach after POCogroup
+ List<Boolean> flattened = new ArrayList<Boolean>();
+ List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+ for (int i=0;i<join.getInputs().size();i++)
+ {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new
OperatorKey(scope,nodeGen.getNextNodeId(scope)));
+ prj.setResultType(DataType.BAG);
+ prj.setColumn(i+1);
+ prj.setOverloaded(false);
+ prj.setStar(false);
+ ep.add(prj);
+ eps.add(ep);
+ flattened.add(true);
+ }
+
+ POForEach fe = new POForEach(new
OperatorKey(scope,nodeGen.getNextNodeId(scope)),-1,eps,flattened);
+
+ fe.setResultType(DataType.BAG);
+
+ currentPlan.add(fe);
+ LogToPhyMap.put(join, fe);
+ try {
+ currentPlan.connect(poc, fe);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode,
PigException.BUG, e);
+ }
+ }
+
+ @Override
public void visit(LOSplit split) throws VisitorException {
String scope = split.getOperatorKey().scope;
PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java?rev=805684&r1=805683&r2=805684&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocal2.java Wed Aug 19
07:01:15 2009
@@ -136,6 +136,47 @@
}
+ @Test
+ public void testJoin1() throws Exception {
+ // Regression test for Pig-925
+ File fp1 = File.createTempFile("test1", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(fp1));
+
+ ps.println("1\t1");
+ ps.println("2\t2");
+ ps.close();
+
+ File fp2 = File.createTempFile("test2", "txt");
+ ps = new PrintStream(new FileOutputStream(fp2));
+
+ ps.println("1\t1");
+ ps.println("2\t2");
+ ps.close();
+
+
+ pig.registerQuery("A = load '" + Util.generateURI(fp1.toString()) +
"'AS (a0:int, a1:int); ");
+ pig.registerQuery("B = load '" + Util.generateURI(fp2.toString()) +
"'AS (b0:int, b1:int); ");
+ pig.registerQuery("C = join A by a0, B by b0;");
+
+ Iterator<Tuple> iter = pig.openIterator("C");
+ assertTrue(iter.hasNext());
+ Tuple t = iter.next();
+ assertTrue(t.get(0).equals(new Integer(1)));
+ assertTrue(t.get(1).equals(new Integer(1)));
+ assertTrue(t.get(2).equals(new Integer(1)));
+ assertTrue(t.get(3).equals(new Integer(1)));
+
+ assertTrue(iter.hasNext());
+ t = iter.next();
+ assertTrue(t.get(0).equals(new Integer(2)));
+ assertTrue(t.get(1).equals(new Integer(2)));
+ assertTrue(t.get(2).equals(new Integer(2)));
+ assertTrue(t.get(3).equals(new Integer(2)));
+
+ assertTrue(!iter.hasNext());
+ }
+
+
static public class Pig800Udf extends EvalFunc<DataBag> {
@Override