Author: rding
Date: Wed Feb 24 18:13:05 2010
New Revision: 915907

URL: http://svn.apache.org/viewvc?rev=915907&view=rev
Log:
PIG-1079: Modify merge join to use distributed cache to maintain the index

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Feb 24 18:13:05 2010
@@ -133,6 +133,9 @@
 
 BUG FIXES
 
+PIG-1079: Modify merge join to use distributed cache to maintain the index
+(rding)
+
 PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
 UDF (yinghe vi olgan)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Wed Feb 24 18:13:05 2010
@@ -59,6 +59,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -221,26 +222,6 @@
         return new Path(pathStr);
     }
 
-    private Path makeTmpPath() throws IOException {
-        Path tmpPath = null;
-        for (int tries = 0;;) {
-            try {
-                tmpPath = 
-                    new Path(FileLocalizer
-                             .getTemporaryPath(null, pigContext).toString());
-                FileSystem fs = tmpPath.getFileSystem(conf);
-                tmpPath = tmpPath.makeQualified(fs);
-                fs.mkdirs(tmpPath);
-                break;
-            } catch (IOException ioe) {
-                if (++tries==100) {
-                    throw ioe;
-                }
-            }
-        }
-        return tmpPath;
-    }
-
     /**
      * Compiles all jobs that have no dependencies removes them from
      * the plan and returns. Should be called with the same plan until
@@ -520,7 +501,7 @@
             // this call modifies the ReplFiles names of POFRJoin operators
             // within the MR plans, must be called before the plans are
             // serialized
-            setupDistributedCacheForFRJoin(mro, pigContext, conf);
+            setupDistributedCacheForJoin(mro, pigContext, conf);
 
             POPackage pack = null;
             if(mro.reducePlan.isEmpty()){
@@ -653,13 +634,13 @@
     }
     
     public static class PigSecondaryKeyGroupComparator extends 
WritableComparator {
-        @SuppressWarnings("unchecked")
         public PigSecondaryKeyGroupComparator() {
 //            super(TupleFactory.getInstance().tupleClass(), true);
             super(NullableTuple.class, true);
         }
 
-        @Override
+        @SuppressWarnings("unchecked")
+               @Override
         public int compare(WritableComparable a, WritableComparable b)
         {
             PigNullableWritable wa = (PigNullableWritable)a;
@@ -947,13 +928,13 @@
         }
     }
 
-    private void setupDistributedCacheForFRJoin(MapReduceOper mro,
+    private void setupDistributedCacheForJoin(MapReduceOper mro,
             PigContext pigContext, Configuration conf) throws IOException {    
   
                     
-        new FRJoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
+        new JoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
                 .visit();
              
-        new FRJoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
+        new JoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
                 .visit();
     }
 
@@ -1056,13 +1037,13 @@
         return symlink;
     }
     
-    private static class FRJoinDistributedCacheVisitor extends PhyPlanVisitor {
+    private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
                  
         private PigContext pigContext = null;
                 
          private Configuration conf = null;
          
-         public FRJoinDistributedCacheVisitor(PhysicalPlan plan, 
+         public JoinDistributedCacheVisitor(PhysicalPlan plan, 
                  PigContext pigContext, Configuration conf) {
              super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
                      plan));
@@ -1110,6 +1091,29 @@
                  throw new VisitorException(msg, e);
              }
          }
+         
+         @Override
+         public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+             
+                // XXX Hadoop currently doesn't support distributed cache in 
local mode.
+             // This line will be removed after the support is added
+             if (pigContext.getExecType() == ExecType.LOCAL) return;
+             
+             String indexFile = join.getIndexFile();
+             
+             // merge join may not use an index file
+             if (indexFile == null) return;
+             
+             try {
+                String symlink = addSingleFileToDistributedCache(pigContext,
+                        conf, indexFile, "indexfile_");
+                join.setIndexFile(symlink);
+            } catch (IOException e) {
+                String msg = "Internal error. Distributed cache could not " +
+                        "be set up for merge join index file";
+                throw new VisitorException(msg, e);
+            }
+         }
      }
     
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Wed Feb 24 18:13:05 2010
@@ -1286,7 +1286,9 @@
                 defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
                 defaultIndexableLoaderArgs[4] = 
origRightLoaderFileSpec.getFileName();
                 joinOp.setRightLoaderFuncSpec((new 
FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
-                
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());    
+                
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());  
+                
+                joinOp.setIndexFile(strFile.getFileName());
                  
             }
             

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Wed Feb 24 18:13:05 2010
@@ -40,6 +40,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -87,6 +88,8 @@
     private FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
+    
+    private String indexFile;
 
     // Buffer to hold accumulated left tuples.
     private List<Tuple> leftTuples;
@@ -131,6 +134,7 @@
         mTupleFactory = TupleFactory.getInstance();
         leftTuples = new ArrayList<Tuple>(arrayListSize);
         this.createJoinPlans(inpPlans,keyTypes);
+        this.indexFile = null;
     }
 
     /**
@@ -384,9 +388,15 @@
         }
     }
     
-    @SuppressWarnings("unchecked")
     private void seekInRightStream(Object firstLeftKey) throws IOException{
         rightLoader = 
(IndexableLoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+        
+        // check if hadoop distributed cache is used
+        if (indexFile != null && rightLoader instanceof 
DefaultIndexableLoader) {
+            DefaultIndexableLoader loader = 
(DefaultIndexableLoader)rightLoader;
+            loader.setIndexFile(indexFile);
+        }
+        
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
         Configuration conf = new Configuration(PigMapReduce.sJobConf);
@@ -545,4 +555,12 @@
     public void setSignature(String signature) {
         this.signature = signature;
     }
+
+    public void setIndexFile(String indexFile) {
+        this.indexFile = indexFile;
+    }
+
+    public String getIndexFile() {
+        return indexFile;
+    }
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java 
Wed Feb 24 18:13:05 2010
@@ -21,10 +21,13 @@
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Properties;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadCaster;
@@ -37,6 +40,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -64,7 +68,7 @@
     // Index is modeled as FIFO queue and LinkedList implements java Queue 
interface.  
     private LinkedList<Tuple> index;
     private FuncSpec rightLoaderFuncSpec;
-    private PigContext pc;
+
     private String scope;
     private Tuple dummyTuple = null;
     private transient TupleFactory mTupleFactory;
@@ -105,14 +109,11 @@
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
         POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new 
FuncSpec(indexFileLoadFuncSpec)));
-        try {
-            pc = 
(PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
-        } catch (IOException e) {
-            int errCode = 2094;
-            String msg = "Unable to deserialize pig context.";
-            throw new ExecException(msg,errCode,e);
-        }
-        pc.connect();
+                
+        Properties props = new Properties();                                   
       
+        props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ 
+        PigContext pc = new PigContext(ExecType.LOCAL, props);
         ld.setPc(pc);
         index = new LinkedList<Tuple>();
         for(Result 
res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple))
@@ -191,6 +192,8 @@
     }
     
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
+        PigContext pc = (PigContext) ObjectSerializer
+                .deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
         //create ReadToEndLoader that will read the given splits in order
         loader = new ReadToEndLoader(
                 
(LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
@@ -255,4 +258,8 @@
         // nothing to do
     }
     
+    public void setIndexFile(String indexFile) {
+        this.indexFile = indexFile;
+    }
+
 }


Reply via email to