Author: olga
Date: Mon Dec 21 19:32:26 2009
New Revision: 892939

URL: http://svn.apache.org/viewvc?rev=892939&view=rev
Log:
PIG-1143: Poisson Sample Loader should compute the number of samples required
only once (sriranjan via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Dec 21 19:32:26 2009
@@ -65,6 +65,9 @@
 
 BUG FIXES
 
+PIG-1143: Poisson Sample Loader should compute the number of samples required
+only once (sriranjan via olgan)
+
 PIG-1157: Sucessive replicated joins do not generate Map Reduce plan and fails
 due to OOM (rding via olgan)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Dec 21 19:32:26 2009
@@ -1845,7 +1845,7 @@
                String inputFile = lFile.getFileName();
 
                return getSamplingJob(sort, prevJob, transformPlans, lFile, 
sampleFile, rp, null, 
-                                                       
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, 
RandomSampleLoader.class.getName());
+                                                       
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, 
PoissonSampleLoader.class.getName());
        }catch(Exception e) {
                throw new PlanException(e);
        }

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Mon Dec 21 19:32:26 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,11 +51,14 @@
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SliceWrapper;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.ValidatingInputFileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
 
 public class PigInputFormat implements InputFormat<Text, Tuple>,
        JobConfigurable {
@@ -239,6 +243,20 @@
                                if ((spec.getSlicer() instanceof PigSlicer)) {
                                    
((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
                                }
+                                               
+                /* Set the input size in UDF Context if LoadFunc is a sample 
loader.
+                 * This value is used by PoissonSampleLoader to calculate the 
number of 
+                 * samplable tuples
+                 */
+                Object loader = 
PigContext.instantiateFuncFromSpec(spec.getFuncSpec());
+                if (loader instanceof SampleLoader) {
+                    Long iSize = FileLocalizer.getSize(spec.getFileName(), 
pigContext.getProperties());                     
+                    UDFContext udfc = UDFContext.getUDFContext();
+                    Properties p = udfc.getUDFProperties(SampleLoader.class);
+                    p.setProperty("pig.input." + i + ".size", 
Long.toString(iSize));
+                    udfc.serialize(job);
+                }
+                       
                                Slice[] pigs = spec.getSlicer().slice(store, 
spec.getFileName());
                                for (Slice split : pigs) {
                                    splits.add(new SliceWrapper(split, 
pigContext.getExecType(), i, fs, inpTargets.get(i)));

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
 Mon Dec 21 19:32:26 2009
@@ -28,6 +28,7 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 import java.util.List;
 
@@ -51,6 +52,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
@@ -141,6 +143,8 @@
         
         wrapped.init(store);
         
+        udfc.serialize(job);
+
         job.set("map.target.ops", ObjectSerializer.serialize(targetOps));
         // Mimic org.apache.hadoop.mapred.FileSplit if feasible...
         String[] locations = wrapped.getLocations();

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
Mon Dec 21 19:32:26 2009
@@ -32,6 +32,7 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 
 /**
@@ -85,6 +86,7 @@
         */
        @Override
        public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
PigContext pc) throws ExecException {
+           
                int numSplits, convFactor, sampleRate;
                Properties pcProps = pc.getProperties();
                
@@ -110,25 +112,23 @@
                
                // % of memory available for the records
                float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-                if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
-                   try {
-                        heapPerc = 
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
-                    }catch(NumberFormatException e) {
-                       // ignore, use default value
-                    }
-                }
-               
-               // we are only concerned with the first input for skewed join
-               String fname = inputs.get(0).first.getFileName();
-               
-               // calculate the base number of samples
+        if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
+            try {
+                heapPerc = Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+            } catch (NumberFormatException e) {
+                // ignore, use default value
+            }
+        }
+
+        // We are concerned with the size of the first input. In case of globs 
/ directories, 
+        // this size is the total size of all the files present in them
+        Properties p = 
UDFContext.getUDFContext().getUDFProperties(SampleLoader.class);
+        Long iSize = Long.valueOf((String) p.get("pig.input.0.size"));
+
+        // calculate the base number of samples
                try {
-                       float f = (Runtime.getRuntime().maxMemory() * heapPerc) 
/ (float) (FileLocalizer.getSize(fname,pcProps) * convFactor);
+                   float f = (Runtime.getRuntime().maxMemory() * heapPerc) / 
(float) (iSize * convFactor);
                        baseNumSamples = (long) Math.ceil(1.0 / f);
-               } catch (IOException e) {
-                       int errCode = 2175;
-                       String msg = "Internal error. Could not retrieve file 
size for the sampler.";
-                       throw new ExecException(msg, errCode, PigException.BUG);
                } catch (ArithmeticException e) {
                        int errCode = 1105;
                        String msg = "Heap percentage / Conversion factor 
cannot be set to 0";
@@ -141,6 +141,7 @@
                // set the minimum number of samples to 1
                n = (n > 1) ? n : 1;
                setNumSamples(n);
+
        }
        
        @Override

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java Mon 
Dec 21 19:32:26 2009
@@ -30,18 +30,24 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.builtin.SampleLoader;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.After;
 import org.junit.Before;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 
+import java.util.Properties;
+
 
 public class TestPoissonSampleLoader extends TestCase{
     private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
@@ -96,6 +102,8 @@
                
                ArrayList<Pair<FileSpec, Boolean>> inputs = new 
ArrayList<Pair<FileSpec, Boolean> >();
                inputs.add(new Pair<FileSpec, Boolean>(fs, true));
+               Properties p = 
UDFContext.getUDFContext().getUDFProperties(SampleLoader.class);
+               p.setProperty("pig.input.0.size", 
Long.toString(Util.getSize(cluster, INPUT_FILE1)));
                
         // Use 100 as a default value;
         PoissonSampleLoader ps = new PoissonSampleLoader((new 
FuncSpec(PigStorage.class.getName())).toString(), "100");
@@ -104,7 +112,7 @@
         ps.computeSamples(inputs, pigServer.getPigContext());
         
         if (ps.getNumSamples() != 3) {
-               fail("Compute samples returned the wrong number of samples");
+               fail("Compute samples returned the wrong number of samples: " + 
ps.getNumSamples() + " instead of 3");
         }
     }
        

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Mon Dec 21 
19:32:26 2009
@@ -24,6 +24,10 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
@@ -444,4 +448,61 @@
         Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));       
        
     }
+    
+    /* Test to check if the samplers sample different input files in the case 
of
+     * serial successive joins
+     */
+    public void testSuccessiveJoins() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as 
(id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as 
(id,name);");
+        pigServer.registerQuery("C = LOAD '" + INPUT_FILE3 + "' as 
(id,name);");
+
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join A by id, B by id using 
\"skewed\";");
+            pigServer.registerQuery("E = join D by A::id, C by id using 
\"skewed\";");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+                
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join A by id, B by id;");
+            pigServer.registerQuery("E = join D by A::id, C by id;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+            while(iter.hasNext()) {
+                dbrj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbfrj.size(), dbrj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
+    }
+    
+    public void testMultiQuery() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as 
(id,name);");
+        pigServer.registerQuery("B = FILTER A by id == 100;");
+        pigServer.registerQuery("C = FILTER A by id == 200;");
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = 
BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join B by id, C by id using 
\"skewed\";");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+                
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join B by id, C by id;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+            while(iter.hasNext()) {
+                dbrj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbfrj.size(), dbrj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));
+    }
+    
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=892939&r1=892938&r2=892939&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Mon Dec 21 19:32:26 2009
@@ -242,6 +242,23 @@
     }
     
     /**
+     * Helper to return the file size on the MiniCluster dfs.
+     * 
+     * @param miniCluster reference to the Minicluster where the file should 
be created
+     * @param fileName pathname of the file to be created
+     * @throws IOException
+     */
+    static public long getSize(MiniCluster miniCluster, String fileName) 
+    throws IOException {
+        FileSystem fs = miniCluster.getFileSystem();
+        Path p = new Path(fileName);
+        if(!fs.exists(p)) {
+            throw new IOException("File " + fileName + " does not exist on the 
minicluster");
+        }
+        return fs.getFileStatus(p).getLen();
+    }
+    
+    /**
      * Helper to create a dfs file on the MiniCluster dfs. This returns an
      * outputstream that can be used in test cases to write data.
      * 


Reply via email to