Author: daijy
Date: Fri Jun 19 18:02:44 2009
New Revision: 786607
URL: http://svn.apache.org/viewvc?rev=786607&view=rev
Log:
PIG-797: Limit with ORDER BY producing wrong results
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jun 19 18:02:44 2009
@@ -84,6 +84,8 @@
BUG FIXES
+PIG-797: Limit with ORDER BY producing wrong results (daijy)
+
PIG-850: Dump produce wrong result while "store into" is ok (daijy)
PIG-846: MultiQuery optimization in some cases has an issue when there is a
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Jun 19 18:02:44 2009
@@ -712,9 +712,38 @@
}
}
+ public void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper
sortMROp) throws PlanException, VisitorException
+ {
+ POLocalRearrange slr =
(POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
+
+ POLocalRearrange lr = null;
+ try {
+ lr = slr.clone();
+ } catch (CloneNotSupportedException e) {
+ int errCode = 2147;
+ String msg = "Error cloning POLocalRearrange for limit after sort";
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+
+ mro.mapPlan.addAsLeaf(lr);
+
+ POPackage spkg = (POPackage)sortMROp.reducePlan.getRoots().get(0);
+
+ POPackage pkg = null;
+ try {
+ pkg = spkg.clone();
+ } catch (Exception e) {
+ int errCode = 2148;
+ String msg = "Error cloning POPackageLite for limit after sort";
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ mro.reducePlan.add(pkg);
+ mro.reducePlan.addAsLeaf(getPlainForEachOP());
+ }
+
public void simpleConnectMapToReduce(MapReduceOper mro) throws
PlanException
{
- PhysicalPlan ep = new PhysicalPlan();
+ PhysicalPlan ep = new PhysicalPlan();
POProject prjStar = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
prjStar.setResultType(DataType.TUPLE);
prjStar.setStar(true);
@@ -727,8 +756,8 @@
try {
lr.setIndex(0);
} catch (ExecException e) {
- int errCode = 2058;
- String msg = "Unable to set index on the newly created
POLocalRearrange.";
+ int errCode = 2058;
+ String msg = "Unable to set index on the newly created
POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
lr.setKeyType(DataType.TUPLE);
@@ -744,6 +773,11 @@
pkg.setInner(inner);
mro.reducePlan.add(pkg);
+ mro.reducePlan.addAsLeaf(getPlainForEachOP());
+ }
+
+ public POForEach getPlainForEachOP()
+ {
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat1 = new ArrayList<Boolean>();
PhysicalPlan ep1 = new PhysicalPlan();
@@ -755,11 +789,10 @@
ep1.add(prj1);
eps1.add(ep1);
flat1.add(true);
- POForEach nfe1 = new POForEach(new OperatorKey(scope, nig
+ POForEach fe = new POForEach(new OperatorKey(scope, nig
.getNextNodeId(scope)), -1, eps1, flat1);
- nfe1.setResultType(DataType.BAG);
-
- mro.reducePlan.addAsLeaf(nfe1);
+ fe.setResultType(DataType.BAG);
+ return fe;
}
public void visitLimit(POLimit op) throws VisitorException{
@@ -1719,7 +1752,10 @@
POLimit pLimit = new POLimit(new
OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit.setLimit(mr.limit);
limitAdjustMROp.mapPlan.addAsLeaf(pLimit);
- simpleConnectMapToReduce(limitAdjustMROp);
+ if (mr.isGlobalSort())
+ connectMapToReduceLimitedSort(limitAdjustMROp, mr);
+ else
+ simpleConnectMapToReduce(limitAdjustMROp);
POLimit pLimit2 = new POLimit(new
OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit2.setLimit(mr.limit);
limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
Fri Jun 19 18:02:44 2009
@@ -303,17 +303,13 @@
}
/**
- * Make a deep copy of this operator. This is declared here to make it
- * public for all physical operators. However, the default
- * implementation is to throw an exception. Operators we expect to clone
- * need to implement this method.
+ * Make a deep copy of this operator. This function is blank, however,
+ * we should leave a place holder so that the subclasses can clone
* @throws CloneNotSupportedException
*/
@Override
public PhysicalOperator clone() throws CloneNotSupportedException {
- String s = new String("This physical operator does not " +
- "implement clone.");
- throw new CloneNotSupportedException(s);
+ return (PhysicalOperator)super.clone();
}
protected void cloneHelper(PhysicalOperator op) {
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
Fri Jun 19 18:02:44 2009
@@ -321,15 +321,21 @@
*/
@Override
public POPackage clone() throws CloneNotSupportedException {
- POPackage clone = new POPackage(new OperatorKey(mKey.scope,
- NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)));
+ POPackage clone = (POPackage)super.clone();
+ clone.mKey = new OperatorKey(mKey.scope,
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope));
+ clone.requestedParallelism = requestedParallelism;
clone.resultType = resultType;
clone.keyType = keyType;
clone.numInputs = numInputs;
- clone.inner = new boolean[inner.length];
- for (int i = 0; i < inner.length; i++) {
- clone.inner[i] = inner[i];
+ if (inner!=null)
+ {
+ clone.inner = new boolean[inner.length];
+ for (int i = 0; i < inner.length; i++) {
+ clone.inner[i] = inner[i];
+ }
}
+ else
+ clone.inner = null;
return clone;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=786607&r1=786606&r2=786607&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Jun 19
18:02:44 2009
@@ -347,4 +347,36 @@
assertEquals(10, numIdentity);
}
+ @Test
+ public void testLimitAfterSort() throws Exception{
+ int LOOP_COUNT = 40;
+ File tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ Random r = new Random(1);
+ int rand;
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ rand = r.nextInt(100);
+ ps.println(rand);
+ }
+ ps.close();
+
+ pigServer.registerQuery("A = LOAD '" +
Util.generateURI(tmpFile.toString()) + "' AS (num:int);");
+ pigServer.registerQuery("B = order A by num parallel 2;");
+ pigServer.registerQuery("C = limit B 10;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ if(!iter.hasNext()) fail("No output found");
+ int numIdentity = 0;
+ int oldNum = Integer.MIN_VALUE;
+ int newNum;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ newNum = (Integer)t.get(0);
+ assertTrue(newNum>=oldNum);
+ oldNum = newNum;
+ ++numIdentity;
+ }
+ assertEquals(10, numIdentity);
+ }
+
+
}