Author: pradeepkth
Date: Fri Feb 19 23:48:11 2010
New Revision: 912038

URL: http://svn.apache.org/viewvc?rev=912038&view=rev
Log:
 PIG-1218: Use distributed cache to store samples (rding via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 19 23:48:11 2010
@@ -55,6 +55,8 @@
 
 IMPROVEMENTS
 
+PIG-1218: Use distributed cache to store samples (rding via pradeepkth)
+
 PIG-1226: suuport for additional jar files (thejas via olgan)
 
 PIG-1230: Streaming input in POJoinPackage should use nonspillable bag to

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Feb 19 23:48:11 2010
@@ -43,6 +43,7 @@
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.ComparisonFunc;
+import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
@@ -54,6 +55,9 @@
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -75,11 +79,14 @@
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 
+
 /**
  * This is compiler class that takes an MROperPlan and converts
  * it into a JobControl object with the relevant dependency info
@@ -509,6 +516,11 @@
             // currently the parent plan is really used only when POStream is 
present in the plan
             new PhyPlanSetter(mro.mapPlan).visit();
             new PhyPlanSetter(mro.reducePlan).visit();
+            
+            // this call modifies the ReplFiles names of POFRJoin operators
+            // within the MR plans, must be called before the plans are
+            // serialized
+            setupDistributedCacheForFRJoin(mro, pigContext, conf);
 
             POPackage pack = null;
             if(mro.reducePlan.isEmpty()){
@@ -582,7 +594,9 @@
                 // Only set the quantiles file and sort partitioner if we're a
                 // global sort, not for limit after sort.
                 if (mro.isGlobalSort()) {
-                    conf.set("pig.quantilesFile", mro.getQuantFile());
+                    String symlink = addSingleFileToDistributedCache(
+                            pigContext, conf, mro.getQuantFile(), "pigsample");
+                    conf.set("pig.quantilesFile", symlink);
                     nwJob.setPartitionerClass(WeightedRangePartitioner.class);
                 }
                 
@@ -613,32 +627,15 @@
             }
             
             if (mro.isSkewedJoin()) {
-                conf.set("pig.keyDistFile", mro.getSkewedJoinPartitionFile());
+                String symlink = addSingleFileToDistributedCache(pigContext,
+                        conf, mro.getSkewedJoinPartitionFile(), "pigdistkey");
+                conf.set("pig.keyDistFile", symlink);
                 nwJob.setPartitionerClass(SkewedPartitioner.class);
                 nwJob.setMapperClass(PigMapReduce.MapWithPartitionIndex.class);
                 nwJob.setMapOutputKeyClass(NullablePartitionWritable.class);
                 
nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
             }
       
-            if (mro.isFrjoin()) {
-                // set up distributed cache for the replicated files
-                FileSpec[] replFiles = mro.getReplFiles();
-                ArrayList<String> replicatedPath = new ArrayList<String>();
-                // the first input is not replicated
-                for(int i=0; i < replFiles.length; i++) {
-                    // ignore fragmented file
-                    if (i != mro.getFragment()) {
-                        replicatedPath.add(replFiles[i].getFileName());
-                    }
-                }
-                try {
-                    setupDistributedCache(pigContext, conf, 
replicatedPath.toArray(new String[0]) , false);
-                } catch (IOException e) {
-                    String msg = "Internal error. Distributed cache could not 
be set up for the replicated files";
-                    throw new IOException (msg, e);
-                }
-            }
-
             // Serialize the UDF specific context info.
             UDFContext.getUDFContext().serialize(conf);
             Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new 
ArrayList());
@@ -950,6 +947,16 @@
         }
     }
 
+    private void setupDistributedCacheForFRJoin(MapReduceOper mro,
+            PigContext pigContext, Configuration conf) throws IOException {    
   
+                    
+        new FRJoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
+                .visit();
+             
+        new FRJoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
+                .visit();
+    }
+
     private static void setupDistributedCache(PigContext pigContext,
                                               Configuration conf, 
                                               Properties properties, String 
key, 
@@ -1022,4 +1029,86 @@
             }
         }        
     }
+    
+    private static String addSingleFileToDistributedCache(
+            PigContext pigContext, Configuration conf, String filename,
+            String prefix) throws IOException {
+
+        if (!FileLocalizer.fileExists(filename, pigContext)) {
+            throw new IOException(
+                    "Internal error: skew join partition file "
+                    + filename + " does not exist");
+        }
+                     
+        String symlink = filename;
+                     
+        // XXX Hadoop currently doesn't support distributed cache in local 
mode.
+        // This line will be removed after the support is added by Hadoop team.
+        if (pigContext.getExecType() != ExecType.LOCAL) {
+            symlink = prefix + "_" 
+                    + Integer.toString(System.identityHashCode(filename)) + "_"
+                    + Long.toString(System.currentTimeMillis());
+            filename = filename + "#" + symlink;
+            setupDistributedCache(pigContext, conf, new String[] { filename },
+                    false);  
+        }
+         
+        return symlink;
+    }
+    
+    private static class FRJoinDistributedCacheVisitor extends PhyPlanVisitor {
+                 
+        private PigContext pigContext = null;
+                
+         private Configuration conf = null;
+         
+         public FRJoinDistributedCacheVisitor(PhysicalPlan plan, 
+                 PigContext pigContext, Configuration conf) {
+             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                     plan));
+             this.pigContext = pigContext;
+             this.conf = conf;
+         }
+         
+         public void visitFRJoin(POFRJoin join) throws VisitorException {
+             
+             // XXX Hadoop currently doesn't support distributed cache in 
local mode.
+             // This line will be removed after the support is added
+             if (pigContext.getExecType() == ExecType.LOCAL) return;
+             
+             // set up distributed cache for the replicated files
+             FileSpec[] replFiles = join.getReplFiles();
+             ArrayList<String> replicatedPath = new ArrayList<String>();
+             
+             FileSpec[] newReplFiles = new FileSpec[replFiles.length];
+             
+             // the first input is not replicated
+             for (int i = 0; i < replFiles.length; i++) {
+                 // ignore fragmented file
+                 String symlink = "";
+                 if (i != join.getFragment()) {
+                     symlink = "pigrepl_" + join.getOperatorKey().toString() + 
"_"
+                         + 
Integer.toString(System.identityHashCode(replFiles[i].getFileName()))
+                         + "_" + Long.toString(System.currentTimeMillis()) 
+                         + "_" + i;
+                     replicatedPath.add(replFiles[i].getFileName() + "#"
+                             + symlink);
+                 }
+                 newReplFiles[i] = new FileSpec(symlink, 
+                         (replFiles[i] == null ? null : 
replFiles[i].getFuncSpec()));               
+             }
+             
+             join.setReplFiles(newReplFiles);
+             
+             try {
+                 setupDistributedCache(pigContext, conf, replicatedPath
+                         .toArray(new String[0]), false);
+             } catch (IOException e) {
+                 String msg = "Internal error. Distributed cache could not " +
+                               "be set up for the replicated files";
+                 throw new VisitorException(msg, e);
+             }
+         }
+     }
+    
 }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Feb 19 23:48:11 2010
@@ -1029,9 +1029,6 @@
                             addUDFs(plan);
                         }
                 }
-            curMROp.setFrjoin(true);
-            curMROp.setFragment(op.getFragment());
-            curMROp.setReplFiles(op.getReplFiles());
             phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
             int errCode = 2034;
@@ -1491,7 +1488,6 @@
                                String msg = "Unable to set index on newly 
created POLocalRearrange.";
                                throw new PlanException(msg, errCode, 
PigException.BUG, e);
                        }               
-                       
((POPartitionRearrange)lr).setPartitionFile(partitionFile.getFileName());
                        
                        groups = (List<PhysicalPlan>)joinPlans.get(l.get(1));
                        lr.setPlans(groups);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Fri Feb 19 23:48:11 2010
@@ -116,12 +116,6 @@
 
     private String scope;
     
-    
-    //Fragment Replicate Join State
-    boolean frjoin = false;
-    FileSpec[] replFiles = null;
-    int fragment = -1;
-    
     int requestedParallelism = -1;
     
     // Last POLimit value in this map reduce operator, needed by LimitAdjuster
@@ -369,29 +363,6 @@
     public void setEndOfAllInputInReduce(boolean endOfAllInputInReduce) {
         this.endOfAllInputInReduce = endOfAllInputInReduce;
     }    
-    public int getFragment() {
-        return fragment;
-    }
-
-    public void setFragment(int fragment) {
-        this.fragment = fragment;
-    }
-
-    public boolean isFrjoin() {
-        return frjoin;
-    }
-
-    public void setFrjoin(boolean frjoin) {
-        this.frjoin = frjoin;
-    }
-
-    public FileSpec[] getReplFiles() {
-        return replFiles;
-    }
-
-    public void setReplFiles(FileSpec[] replFiles) {
-        this.replFiles = replFiles;
-    }
 
     public int getRequestedParallelism() {
         return requestedParallelism;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 Fri Feb 19 23:48:11 2010
@@ -107,7 +107,8 @@
 
         try {
             Integer [] redCnt = new Integer[1]; 
-            reducerMap = MapRedUtil.loadPartitionFile(keyDistFile, redCnt, 
job, DataType.TUPLE);
+            reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
+                    keyDistFile, redCnt, DataType.TUPLE);
             totalReducers = redCnt[0];
         } catch (Exception e) {
             throw new RuntimeException(e);

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 Fri Feb 19 23:48:11 2010
@@ -17,7 +17,6 @@
  */
 package 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -30,21 +29,17 @@
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableDoubleWritable;
@@ -54,8 +49,11 @@
 import org.apache.pig.impl.io.NullableText;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.Pair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, 
Writable>   
                                       implements Configurable {
@@ -64,6 +62,8 @@
     public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> 
weightedParts 
         = new HashMap<PigNullableWritable, 
DiscreteProbabilitySampleGenerator>();
     
+    private static final Log log = 
LogFactory.getLog(WeightedRangePartitioner.class);
+    
     Configuration job;
 
     @SuppressWarnings("unchecked")
@@ -90,15 +90,23 @@
     @Override
     public void setConf(Configuration configuration) {
         job = configuration;
+        
+        String quantilesFile = configuration.get("pig.quantilesFile", "");
 
-        String quantilesFile = job.get("pig.quantilesFile", "");
-
-        if (quantilesFile.length() == 0)
-            throw new RuntimeException(this.getClass().getSimpleName() + " 
used but no quantiles found");
+        if (quantilesFile.length() == 0) {
+            throw new RuntimeException(this.getClass().getSimpleName()
+                    + " used but no quantiles found");
+        }
         
         try{
-            ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), 
job, 
-                    quantilesFile, 0);
+            
+            
+            // use local file system to get the quantilesFile
+            Configuration conf = new Configuration(false);            
+            conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+            
+            ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(),
+                    conf, quantilesFile, 0);
             DataBag quantilesList;
             Tuple t = loader.getNext();
             if(t!=null)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 Fri Feb 19 23:48:11 2010
@@ -18,14 +18,20 @@
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +43,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.NonSpillableDataBag;
@@ -48,6 +55,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
 
 /**
  * The operator models the join keys using the Local Rearrange operators which
@@ -67,7 +75,7 @@
      * 
      */
     private static final long serialVersionUID = 1L;
-    static private Log log = LogFactory.getLog(POFRJoin.class);
+    static private Log log = LogFactory.getLog(POFRJoin.class);          
     // The number in the input list which denotes the fragmented input
     private int fragment;
     // There can be n inputs each being a List<PhysicalPlan>
@@ -311,9 +319,11 @@
 
             POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
                     replFile);
-            PigContext pc = new PigContext(ExecType.MAPREDUCE,
-                    ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
-            pc.connect();
+            
+            Properties props = new Properties();                               
           
+            props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+                         
+            PigContext pc = new PigContext(ExecType.LOCAL, props);   
             ld.setPc(pc);
             // We use LocalRearrange Operator to seperate Key and Values
             // eg. ( a, b, c ) would generate a, ( a, b, c )

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 Fri Feb 19 23:48:11 2010
@@ -17,43 +17,26 @@
  */
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
-import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.Iterator;
-
-
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.builtin.BinStorage;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.TupleFactory;
-
-import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DefaultTupleFactory;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 
 /**
@@ -68,7 +51,7 @@
      * 
      */
     private static final long serialVersionUID = 1L;
-    private String partitionFile;
+    
     private Integer totalReducers = -1;
     // ReducerMap will store the tuple, max reducer index & min reducer index
     private static Map<Object, Pair<Integer, Integer> > reducerMap = new 
HashMap<Object, Pair<Integer, Integer> >();
@@ -95,26 +78,20 @@
         leafOps = new ArrayList<ExpressionOperator>();
     }
 
-       /* Returns the name for the partition sampling file */
-       public String getPartitionFile() {
-               return partitionFile;
-       }
-
-       /* Set the partition sampling file */
-       public void setPartitionFile(String file) {
-               partitionFile = file;
-       }
-
     /* Loads the key distribution file obtained from the sampler */
     private void loadPartitionFile() throws RuntimeException {
+        String keyDistFile = PigMapReduce.sJobConf.get("pig.keyDistFile", "");
+        if (keyDistFile.isEmpty()) {
+            throw new RuntimeException(
+            "Internal error: missing key distribution file property.");
+        }
+
         try {
             Integer [] redCnt = new Integer[1]; 
             
-            reducerMap = MapRedUtil.loadPartitionFile(partitionFile, 
-                    redCnt, 
-                    
ConfigurationUtil.toConfiguration(pigContext.getProperties()),
-                    DataType.NULL
-            );
+            reducerMap = MapRedUtil.loadPartitionFileFromLocalCache(
+                    keyDistFile, redCnt, DataType.NULL);
+
             totalReducers = redCnt[0];
             loaded = true;
         } catch (Exception e) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 Fri Feb 19 23:48:11 2010
@@ -23,6 +23,8 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.util.Progressable;
@@ -46,6 +48,10 @@
  */
 public class MapRedUtil {
 
+    private static Log log = LogFactory.getLog(MapRedUtil.class);
+         
+    public static final String FILE_SYSTEM_NAME = "fs.default.name";
+
     /**
      * This method is to be called from an 
      * {...@link org.apache.hadoop.mapred.OutputFormat#getRecordWriter(
@@ -84,12 +90,17 @@
      * @param keyType Type of the key to be stored in the return map. It 
currently treats Tuple as a special case.
      */
     @SuppressWarnings("unchecked")
-    public static <E> Map<E, Pair<Integer, Integer> > loadPartitionFile(String 
keyDistFile,
-                                        Integer[] totalReducers, Configuration 
job, byte keyType) throws IOException {
+    public static <E> Map<E, Pair<Integer, Integer>> 
loadPartitionFileFromLocalCache(
+            String keyDistFile, Integer[] totalReducers, byte keyType)
+            throws IOException {
+
+        Map<E, Pair<Integer, Integer>> reducerMap = new HashMap<E, 
Pair<Integer, Integer>>();
 
-        Map<E, Pair<Integer, Integer> > reducerMap = new HashMap<E, 
Pair<Integer, Integer> >();
+        // use local file system to get the keyDistFile
+        Configuration conf = new Configuration(false);            
+        conf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
 
-        ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), job, 
+        ReadToEndLoader loader = new ReadToEndLoader(new BinStorage(), conf, 
                 keyDistFile, 0);
         DataBag partitionList;
         Tuple t = loader.getNext();
@@ -141,4 +152,5 @@
         udfc.addJobConf(job);
         udfc.deserialize();
     }
+
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=912038&r1=912037&r2=912038&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Fri Feb 19 
23:48:11 2010
@@ -25,8 +25,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -520,16 +522,17 @@
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         
         // Get the sort job
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        jcc.updateMROpPlan(new ArrayList<Job>());
-        jobControl = jcc.compile(mrPlan, "Test");
-        jcc.updateMROpPlan(new ArrayList<Job>());
-        jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
+        Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
+        int counter = 0;
+        while (iter.hasNext()) {
+            MapReduceOper op = iter.next();
+            counter++;
+            if (op.isGlobalSort()) {
+                assertTrue(op.getRequestedParallelism()==100);
+            }
+        }
+        assertEquals(3, counter);
 
-        assertTrue(parallel==100);
-        
         pc.defaultParallel = -1;        
     }
     
@@ -552,15 +555,16 @@
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         
         // Get the skew join job
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        jcc.updateMROpPlan(new ArrayList<Job>());
-        jobControl = jcc.compile(mrPlan, "Test");
-        jcc.updateMROpPlan(new ArrayList<Job>());
-        jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
-
-        assertTrue(parallel==100);
+        Iterator<MapReduceOper> iter = mrPlan.getKeys().values().iterator();
+        int counter = 0;
+        while (iter.hasNext()) {
+            MapReduceOper op = iter.next();
+            counter++;
+            if (op.isSkewedJoin()) {
+                assertTrue(op.getRequestedParallelism()==100);
+            }
+        }
+        assertEquals(3, counter);
         
         pc.defaultParallel = -1;        
     }


Reply via email to