Author: rding
Date: Mon Aug 30 22:19:50 2010
New Revision: 991008

URL: http://svn.apache.org/viewvc?rev=991008&view=rev
Log:
PIG-1458: aggregate files for replicated join

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/conf/pig-default.properties
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=991008&r1=991007&r2=991008&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Aug 30 22:19:50 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1458: aggregate files for replicated join (rding)
+
 PIG-1205:  Enhance HBaseStorage-- Make it support loading row key and 
implement StoreFunc (zjffdu and dvryaboy)
 
 PIG-1568: Optimization rule FilterAboveForeach is too restrictive and doesn't

Modified: hadoop/pig/trunk/conf/pig-default.properties
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/conf/pig-default.properties?rev=991008&r1=991007&r2=991008&view=diff
==============================================================================
--- hadoop/pig/trunk/conf/pig-default.properties (original)
+++ hadoop/pig/trunk/conf/pig-default.properties Mon Aug 30 22:19:50 2010
@@ -28,3 +28,7 @@ pig.exec.reducers.max=999
 
 #Temporary location to store the intermediate data.
 pig.temp.dir=/tmp/
+
+#Threshold for merging FRJoin fragment files
+pig.frjoin.merge.files.threshold=100
+pig.frjoin.merge.files.optimistic=false;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=991008&r1=991007&r2=991008&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Aug 30 22:19:50 2010
@@ -19,6 +19,8 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,7 +31,16 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
@@ -39,6 +50,7 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
@@ -75,6 +87,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -171,9 +184,17 @@ public class MRCompiler extends PhyPlanV
     private CompilationMessageCollector messageCollector = null;
     
     private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
-    
+        
     public static final String USER_COMPARATOR_MARKER = 
"user.comparator.func:";
    
+    private static final Log LOG = LogFactory.getLog(MRCompiler.class);
+    
+    public static final String FRJOIN_MERGE_FILES_THRESHOLD = 
"pig.frjoin.merge.files.threshold";
+    public static final String FRJOIN_MERGE_FILES_OPTIMISTIC = 
"pig.frjoin.merge.files.optimistic";
+    
+    private int frJoinFileMergeThreshold = 100;
+    private boolean frJoinOptimisticFileMerge = false;
+    
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
         this(plan,null);
     }
@@ -198,6 +219,13 @@ public class MRCompiler extends PhyPlanV
         scope = roots.get(0).getOperatorKey().getScope();
         messageCollector = new CompilationMessageCollector() ;
         phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
+        
+        frJoinFileMergeThreshold = Integer.parseInt(pigContext.getProperties()
+                .getProperty(FRJOIN_MERGE_FILES_THRESHOLD, "100"));
+        frJoinOptimisticFileMerge = pigContext.getProperties().getProperty(
+                FRJOIN_MERGE_FILES_OPTIMISTIC, "false").equals("true");
+        LOG.info("FRJoin file merge threshold: " + frJoinFileMergeThreshold
+                + " optimistic? " + frJoinOptimisticFileMerge);
     }
     
     public void connectScalars() throws PlanException {
@@ -1083,7 +1111,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+            
     /**
      * This is an operator which will have multiple inputs(= to number of join 
inputs)
      * But it prunes off all inputs but the fragment input and creates 
separate MR jobs
@@ -1109,18 +1137,44 @@ public class MRCompiler extends PhyPlanV
                     continue;
                 POStore str = getStore();
                 str.setSFile(replFiles[i]);
-                if (!mro.isMapDone()) {
-                    mro.mapPlan.addAsLeaf(str);
-                    mro.setMapDoneSingle(true);
+                
+                Configuration conf = 
+                    
ConfigurationUtil.toConfiguration(pigContext.getProperties());
+                boolean combinable = 
!conf.getBoolean("pig.noSplitCombination", false);
+                
+                if (!mro.isMapDone()) {   
+                    if (combinable && hasTooManyInputFiles(mro, conf)) { 
+                        POStore tmpSto = getStore();
+                        FileSpec fSpec = getTempFileSpec();
+                        tmpSto.setSFile(fSpec);                         
+                        mro.mapPlan.addAsLeaf(tmpSto);
+                        mro.setMapDoneSingle(true);                    
+                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, 
str); 
+                        MRPlan.connect(catMROp, curMROp);
+                    } else {
+                        mro.mapPlan.addAsLeaf(str);
+                        mro.setMapDoneSingle(true); 
+                        MRPlan.connect(mro, curMROp);
+                    }
                 } else if (mro.isMapDone() && !mro.isReduceDone()) {
-                    mro.reducePlan.addAsLeaf(str);
-                    mro.setReduceDone(true);
+                    if (combinable && (mro.requestedParallelism >= 
frJoinFileMergeThreshold)) {
+                        POStore tmpSto = getStore();
+                        FileSpec fSpec = getTempFileSpec();
+                        tmpSto.setSFile(fSpec); 
+                        mro.reducePlan.addAsLeaf(tmpSto);
+                        mro.setReduceDone(true);
+                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, 
str); 
+                        MRPlan.connect(catMROp, curMROp);
+                    } else {
+                        mro.reducePlan.addAsLeaf(str);
+                        mro.setReduceDone(true);
+                        MRPlan.connect(mro, curMROp);
+                    }
                 } else {
                     int errCode = 2022;
                     String msg = "Both map and reduce phases have been done. 
This is unexpected while compiling.";
                     throw new PlanException(msg, errCode, PigException.BUG);
-                }
-                MRPlan.connect(compiledInputs[i], curMROp);
+                }              
             }
             
             if (!curMROp.isMapDone()) {
@@ -1147,7 +1201,92 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-
+  
+    @SuppressWarnings("unchecked")
+    private boolean hasTooManyInputFiles(MapReduceOper mro, Configuration 
conf) {
+        if (pigContext == null || pigContext.getExecType() == ExecType.LOCAL) {
+            return false;
+        }
+        
+        if (mro instanceof NativeMapReduceOper) {
+            return frJoinOptimisticFileMerge ? false : true;
+        }
+               
+        PhysicalPlan mapPlan = mro.mapPlan;
+        
+        List<PhysicalOperator> roots = mapPlan.getRoots();
+        if (roots == null || roots.size() == 0) return false;
+        
+        int numFiles = 0;
+        boolean ret = false;
+        try {
+            for (PhysicalOperator root : roots) {
+                POLoad ld = (POLoad) root;
+                String location = ld.getLFile().getFileName();
+                URI uri = new URI(location);
+                if (uri.getScheme() == null
+                        || uri.getScheme().equalsIgnoreCase("hdfs")) {
+                    Path p = new Path(location);                   
+                    FileSystem fs = p.getFileSystem(conf);
+                    if (fs.exists(p)) {
+                        LoadFunc loader = (LoadFunc) PigContext
+                                .instantiateFuncFromSpec(ld.getLFile()
+                                        .getFuncSpec());
+                        Job job = new Job(conf);
+                        loader.setLocation(location, job);
+                        InputFormat inf = loader.getInputFormat();
+                        List<InputSplit> splits = inf.getSplits(new JobContext(
+                                job.getConfiguration(), job.getJobID()));
+                        List<List<InputSplit>> results = MapRedUtil
+                                .getCombinePigSplits(splits, fs
+                                        .getDefaultBlockSize(), conf);
+                        numFiles += results.size();
+                    } else {
+                        List<MapReduceOper> preds = 
MRPlan.getPredecessors(mro);
+                        if (preds != null && preds.size() == 1) {
+                            MapReduceOper pred = preds.get(0);
+                            if (!pred.reducePlan.isEmpty()) { 
+                                numFiles += pred.requestedParallelism;  
+                            } else { // map-only job
+                                ret = hasTooManyInputFiles(pred, conf);
+                                break;
+                            }
+                        } else if (!frJoinOptimisticFileMerge) {               
     
+                            // can't determine the number of input files. 
+                            // Treat it as having too manyfiles
+                            numFiles = frJoinFileMergeThreshold;
+                            break;
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            LOG.warn("failed to get number of input files", e); 
+        } catch (URISyntaxException e) {
+            LOG.warn("failed to get number of input files", e); 
+        } catch (InterruptedException e) {
+            LOG.warn("failed to get number of input files", e); 
+        }
+                
+        LOG.info("number of input files to FR Join: " + numFiles);
+        return ret ? true : (numFiles >= frJoinFileMergeThreshold);
+    }
+    
+    /*
+     * Use Mult File Combiner to concatenate small input files
+     */
+    private MapReduceOper getConcatenateJob(FileSpec fSpec, MapReduceOper old, 
POStore str)
+            throws PlanException, ExecException {
+        
+        MapReduceOper mro = startNew(fSpec, old);
+        mro.mapPlan.addAsLeaf(str);
+        mro.setMapDone(true);
+        
+        LOG.info("Insert a concatenate job for FR join");
+                
+        return mro;
+    }
+    
     /** Leftmost relation is referred as base relation (this is the one fed 
into mappers.) 
      *  First, close all MROpers except for first one (referred as baseMROPer)
      *  Then, create a MROper which will do indexing job (idxMROper)

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java?rev=991008&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestFRJoin2.java Mon Aug 30 
22:19:50 2010
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFRJoin2 {
+
+    private static MiniCluster cluster = MiniCluster.buildCluster();
+    
+    private static final String INPUT_DIR = "frjoin";
+    private static final String INPUT_FILE = "input";
+    
+    private static final int FILE_MERGE_THRESHOLD = 5;
+    
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        FileSystem fs = cluster.getFileSystem();
+        fs.mkdirs(new Path(INPUT_DIR));
+        int LOOP_SIZE = 2;
+        for (int i=0; i<FILE_MERGE_THRESHOLD; i++) {        
+            String[] input = new String[2*LOOP_SIZE];
+            for (int n=0; n<LOOP_SIZE; n++) {
+                for (int j=0; j<LOOP_SIZE;j++) {
+                    input[n*LOOP_SIZE + j] = i + "\t" + j;
+                }
+            }
+            Util.createInputFile(cluster, INPUT_DIR + "/part-0000" + i, input);
+        }
+
+        String[] input2 = new String[2*(LOOP_SIZE/2)];
+        int k = 0;
+        for (int i=1; i<=LOOP_SIZE/2; i++) {
+            String si = i + "";
+            for (int j=0; j<=LOOP_SIZE/2; j++) {
+                input2[k++] = si + "\t" + j;
+            }
+        }
+        Util.createInputFile(cluster, INPUT_FILE, input2);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        cluster.shutDown();
+    }
+
+    @Test
+    public void testConcatenateJob() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_DIR + "' as 
(x:int,y:int);");
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+            
+            pigServer.registerQuery("C = join A by y, B by y using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+            
+            // In this case, multi-file-combiner is used so there is no need 
to add
+            // a concatenate job
+            assertEquals(2, PigStats.get().getJobGraph().size());
+        }
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    "pig.noSplitCombination", "true");
+            
+            pigServer.registerQuery("C = join A by y, B by y using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+            
+            assertEquals(2, PigStats.get().getJobGraph().size());
+        }
+        
+        assertEquals(dbfrj.size(), dbshj.size());
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+    }
+            
+    @Test
+    public void testTooManyReducers() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+                .getProperties());
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = group A by x parallel " + 
FILE_MERGE_THRESHOLD + ";"); 
+        pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+            pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                dbfrj.add(t);               
+            }
+            
+            JobGraph jGraph = PigStats.get().getJobGraph();
+            assertEquals(3, jGraph.size());
+            // find added map-only concatenate job 
+            JobStats js = 
(JobStats)jGraph.getSuccessors(jGraph.getSources().get(0)).get(0);
+            assertEquals(1, js.getNumberMaps());   
+            assertEquals(0, js.getNumberReduces());   
+        }
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    "pig.noSplitCombination", "true");
+            pigServer.registerQuery("D = join C by $0, B by $0 using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                Tuple t = iter.next();
+                dbshj.add(t);                
+            }
+            assertEquals(2, PigStats.get().getJobGraph().size());
+        }        
+        assertEquals(dbfrj.size(), dbshj.size());
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+    }
+    
+    @Test
+    public void testUnknownNumMaps() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = Filter A by x < 50;");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+            pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+            // In this case, multi-file-combiner is used in grandparent job
+            // so there is no need to add a concatenate job
+            JobGraph jGraph = PigStats.get().getJobGraph();
+            assertEquals(2, jGraph.size());
+        }
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    "pig.noSplitCombination", "true");
+            pigServer.registerQuery("C = join A by $0, B by $0 using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+            assertEquals(2, PigStats.get().getJobGraph().size());
+        }
+        assertEquals(dbfrj.size(), dbshj.size());
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+    }
+    
+    @Test
+    public void testUnknownNumMaps2() throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        
+        pigServer.registerQuery("A = LOAD '" + INPUT_DIR + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as 
(x:int,y:int);");
+        pigServer.registerQuery("C = join A by x, B by x using 'repl';");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    MRCompiler.FRJOIN_MERGE_FILES_THRESHOLD, 
String.valueOf(FILE_MERGE_THRESHOLD));
+            pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+            // In this case, multi-file-combiner is used in grandparent job
+            // so there is no need to add a concatenate job
+            JobGraph jGraph = PigStats.get().getJobGraph();
+            assertEquals(3, jGraph.size());
+        }
+        {
+            pigServer.getPigContext().getProperties().setProperty(
+                    "pig.noSplitCombination", "true");
+            pigServer.registerQuery("D = join B by $0, C by $0 using 'repl';");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+            assertEquals(3, PigStats.get().getJobGraph().size());
+        }
+        assertEquals(dbfrj.size(), dbshj.size());
+        assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));    
+    }
+    
+}


Reply via email to