Author: gates
Date: Fri Oct 16 19:20:31 2009
New Revision: 826047
URL: http://svn.apache.org/viewvc?rev=826047&view=rev
Log:
PIG-858: Order By followed by "replicated" join fails while compiling MR-plan
from physical plan.
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=826047&r1=826046&r2=826047&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 16 19:20:31 2009
@@ -57,6 +57,9 @@
BUG FIXES
+PIG-858: Order By followed by "replicated" join fails while compiling MR-plan
+from physical plan (ashutoshc via gates)
+
PIG-968: Fix findContainingJar to work properly when there is a + in the jar
path (tlipcon via gates).
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=826047&r1=826046&r2=826047&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Oct 16 19:20:31 2009
@@ -55,6 +55,7 @@
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
@@ -167,6 +168,8 @@
private CompilationMessageCollector messageCollector = null;
+ private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
+
public static String USER_COMPARATOR_MARKER = "user.comparator.func:";
public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
@@ -193,6 +196,7 @@
scope = roots.get(0).getOperatorKey().getScope();
messageCollector = new CompilationMessageCollector() ;
storeToMapReduceMap = new HashMap<POStore, MapReduceOper>();
+ phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
}
public void randomizeFileLocalizer(){
@@ -325,6 +329,7 @@
plan.disconnect(op, p);
MRPlan.connect(oper, curMROp);
+ phyToMROpMap.put(op, curMROp);
return;
}
@@ -351,6 +356,7 @@
curMROp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString());
}
MRPlan.add(curMROp);
+ phyToMROpMap.put(op, curMROp);
return;
}
@@ -690,6 +696,7 @@
* @param op - The split operator
* @throws VisitorException
*/
+ @Override
public void visitSplit(POSplit op) throws VisitorException{
try{
FileSpec fSpec = op.getSplitStore();
@@ -697,6 +704,7 @@
mro.setSplitter(true);
splitsSeen.put(op.getOperatorKey(), mro);
curMROp = startNew(fSpec, mro);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -704,9 +712,11 @@
}
}
+ @Override
public void visitLoad(POLoad op) throws VisitorException{
try{
nonBlocking(op);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -714,10 +724,12 @@
}
}
+ @Override
public void visitStore(POStore op) throws VisitorException{
try{
storeToMapReduceMap.put(op, curMROp);
nonBlocking(op);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -725,10 +737,12 @@
}
}
+ @Override
public void visitFilter(POFilter op) throws VisitorException{
try{
nonBlocking(op);
addUDFs(op.getPlan());
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -736,9 +750,11 @@
}
}
+ @Override
public void visitStream(POStream op) throws VisitorException{
try{
nonBlocking(op);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -829,6 +845,7 @@
return fe;
}
+ @Override
public void visitLimit(POLimit op) throws VisitorException{
try{
@@ -860,6 +877,7 @@
messageCollector.collect("Both map and reduce phases have been
done. This is unexpected while compiling!",
MessageType.Warning,
PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED);
}
+ phyToMROpMap.put(op, mro);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -867,6 +885,7 @@
}
}
+ @Override
public void visitLocalRearrange(POLocalRearrange op) throws
VisitorException {
try{
nonBlocking(op);
@@ -874,6 +893,7 @@
if(plans!=null)
for(PhysicalPlan ep : plans)
addUDFs(ep);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -881,6 +901,7 @@
}
}
+ @Override
public void visitPOForEach(POForEach op) throws VisitorException{
try{
nonBlocking(op);
@@ -889,6 +910,7 @@
for (PhysicalPlan plan : plans) {
addUDFs(plan);
}
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -896,9 +918,11 @@
}
}
+ @Override
public void visitGlobalRearrange(POGlobalRearrange op) throws
VisitorException{
try{
blocking(op);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -906,9 +930,11 @@
}
}
+ @Override
public void visitPackage(POPackage op) throws VisitorException{
try{
nonBlocking(op);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -916,9 +942,11 @@
}
}
+ @Override
public void visitUnion(POUnion op) throws VisitorException{
try{
nonBlocking(op);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -943,22 +971,14 @@
}
op.setReplFiles(replFiles);
- List<OperatorKey> opKeys = new
ArrayList<OperatorKey>(op.getInputs().size());
- for (PhysicalOperator pop : op.getInputs()) {
- opKeys.add(pop.getOperatorKey());
- }
- int fragPlan = 0;
+
+ curMROp = phyToMROpMap.get(op.getInputs().get(op.getFragment()));
for(int i=0;i<compiledInputs.length;i++){
MapReduceOper mro = compiledInputs[i];
- OperatorKey opKey = (!mro.isMapDone()) ?
mro.mapPlan.getLeaves().get(0).getOperatorKey()
- :
mro.reducePlan.getLeaves().get(0).getOperatorKey();
- if(opKeys.indexOf(opKey)==op.getFragment()){
- curMROp = mro;
- fragPlan = i;
+ if(curMROp.equals(mro))
continue;
- }
POStore str = getStore();
- str.setSFile(replFiles[opKeys.indexOf(opKey)]);
+ str.setSFile(replFiles[i]);
if (!mro.isMapDone()) {
mro.mapPlan.addAsLeaf(str);
mro.setMapDoneSingle(true);
@@ -966,13 +986,10 @@
mro.reducePlan.addAsLeaf(str);
mro.setReduceDone(true);
} else {
- int errCode = 2022;
+ int errCode = 2022;
String msg = "Both map and reduce phases have been done.
This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
- }
- for(int i=0;i<compiledInputs.length;i++){
- if(i==fragPlan) continue;
MRPlan.connect(compiledInputs[i], curMROp);
}
@@ -996,6 +1013,7 @@
curMROp.setFrjoin(true);
curMROp.setFragment(op.getFragment());
curMROp.setReplFiles(op.getReplFiles());
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -1227,6 +1245,7 @@
// We want to ensure indexing job runs prior to actual join job.
So, connect them in order.
MRPlan.connect(rightMROpr, curMROp);
+ phyToMROpMap.put(joinOp, curMROp);
}
catch(PlanException e){
int errCode = 2034;
@@ -1298,6 +1317,7 @@
nfe1.setResultType(DataType.BAG);
curMROp.reducePlan.addAsLeaf(nfe1);
curMROp.setNeedsDistinctCombiner(true);
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -1305,6 +1325,7 @@
}
}
+ @Override
public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
try {
if (compiledInputs.length != 2) {
@@ -1452,6 +1473,7 @@
fe.visit(this);
curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
+ phyToMROpMap.put(op, curMROp);
}catch(PlanException e) {
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
@@ -1480,6 +1502,7 @@
if(op.isUDFComparatorUsed){
curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
}
+ phyToMROpMap.put(op, curMROp);
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " +
op.getClass().getSimpleName();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java?rev=826047&r1=826046&r2=826047&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin.java Fri Oct 16
19:20:31 2009
@@ -146,6 +146,58 @@
}
+ public void testSortFRJoin() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
+ pigServer.registerQuery("D = ORDER A by y;");
+ pigServer.registerQuery("E = ORDER B by y;");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj =
BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join D by $0, E by $0 using
\"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join D by $0, E by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbfrj.size(), dbshj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ public void testDistinctFRJoin() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as
(x:int,y:int);");
+ pigServer.registerQuery("D = distinct A ;");
+ pigServer.registerQuery("E = distinct B ;");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj =
BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join D by $0, E by $0 using
\"replicated\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join D by $0, E by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbfrj.size(), dbshj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
@Test
public void testUDFFRJ() throws IOException {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:chararray,y:int);");