Author: daijy
Date: Thu Sep  3 00:36:45 2009
New Revision: 810742

URL: http://svn.apache.org/viewvc?rev=810742&view=rev
Log:
PIG-890: Create a sampler interface and improve the skewed join sampler

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Sep  3 00:36:45 2009
@@ -28,11 +28,13 @@
 
 IMPROVEMENTS
 
+PIG-890: Create a sampler interface and improve the skewed join sampler 
(sriranjan via daijy)
+
 PIG-922: Logical optimizer: push up project part 1 (daijy)
 
 PIG-812: COUNT(*) does not work (breed)
 
-PIG-923: Allow specifying log file location through pig.properties (dvryaboy 
through daijy)
+PIG-923: Allow specifying log file location through pig.properties (dvryaboy 
via daijy)
 
 PIG-926: Merge-Join phase 2 (ashutoshc via pradeepkth)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java 
Thu Sep  3 00:36:45 2009
@@ -26,6 +26,7 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.pig.FuncSpec;
@@ -35,10 +36,15 @@
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.SeekableInputStream;
 import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.SampleLoader;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 
 /**
@@ -69,9 +75,10 @@
         return new String[] { file };
     }
 
-    public void init(DataStorage base) throws IOException {
+    @SuppressWarnings("unchecked")
+       public void init(DataStorage base) throws IOException {
         if (parser == null) {
-            loader = new PigStorage();
+               loader = new PigStorage();
         } else {
             try {
                 loader = (LoadFunc) PigContext.instantiateFuncFromSpec(parser);
@@ -95,9 +102,24 @@
             end = Long.MAX_VALUE;
         } else {
             is = fsis;
-        }
+        }       
+           
+        // set the right sample size
+        if (loader instanceof SampleLoader) {
+               try {
+                       PigContext pc = (PigContext) 
ObjectSerializer.deserialize(((HDataStorage)base).getConfiguration().getProperty("pig.pigContext"));
+                       ArrayList<Pair<FileSpec, Boolean>> inputs = 
+                               (ArrayList<Pair<FileSpec, Boolean>>) 
ObjectSerializer.deserialize(((HDataStorage)base).getConfiguration().getProperty("pig.inputs"));
+                       
+                       ((SampleLoader)loader).computeSamples(inputs, pc);      
                
+               } catch (Exception e) {
+                       throw new ExecException(e.getMessage());
+               }
+       }
+        
         loader.bindTo(file.toString(), new BufferedPositionedInputStream(is,
                 start), start, end);
+                
     }
 
     public boolean next(Tuple value) throws IOException {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Sep  3 00:36:45 2009
@@ -39,6 +39,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.builtin.MergeJoinIndexer;
 import org.apache.pig.impl.builtin.TupleSize;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
@@ -1699,7 +1700,7 @@
             }
         }
         
-        return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null, 
FindQuantiles.class.getName(), ctorArgs);
+        return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null, 
FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
     }
     
     /**
@@ -1749,7 +1750,7 @@
                String inputFile = lFile.getFileName();
 
                return getSamplingJob(sort, prevJob, transformPlans, lFile, 
sampleFile, rp, null, 
-                                                       
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile});
+                                                       
PartitionSkewedKeys.class.getName(), new String[]{per, mc, inputFile}, 
PoissonSampleLoader.class.getName());
        }catch(Exception e) {
                throw new PlanException(e);
        }
@@ -1776,21 +1777,25 @@
      * @param sortKeyPlans  PhysicalPlans to be set into POSort operator to 
get sorting keys
      * @param udfClassName  the class name of UDF
      * @param udfArgs   the arguments of UDF
+     * @param sampleLdrClassName class name for the sample loader
      * @return pair<mapreduceoper,integer>
      * @throws PlanException
      * @throws VisitorException
      */
        protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, 
MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
                        FileSpec lFile, FileSpec sampleFile, int rp, 
List<PhysicalPlan> sortKeyPlans, 
-                       String udfClassName, String[] udfArgs ) throws 
PlanException, VisitorException {
+                       String udfClassName, String[] udfArgs, String 
sampleLdrClassName ) throws PlanException, VisitorException {
                
                String[] rslargs = new String[2];
-        // RandomSampleLoader expects string version of FuncSpec 
+        // SampleLoader expects string version of FuncSpec 
         // as its first constructor argument.
+        
         rslargs[0] = (new FuncSpec(BinStorage.class.getName())).toString();
-        rslargs[1] = "100"; // TODO Needs to be determined based on file size.
+        
+        rslargs[1] = "100"; // The value is calculated based on the file size 
for skewed join
         FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
-            new FuncSpec(RandomSampleLoader.class.getName(), rslargs));
+                       new FuncSpec(sampleLdrClassName, rslargs));
+        
         MapReduceOper mro = startNew(quantLdFilName, prevJob);
        
         if(sort.isUDFComparatorUsed) {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Thu Sep  3 00:36:45 2009
@@ -178,7 +178,8 @@
      * per DFS block of the input file. Configures the PigSlice
      * and returns the list of PigSlices as an array
      */
-    public InputSplit[] getSplits(JobConf job, int numSplits)
+    @SuppressWarnings("unchecked")
+       public InputSplit[] getSplits(JobConf job, int numSplits)
             throws IOException {
         ArrayList<Pair<FileSpec, Boolean>> inputs;
                ArrayList<ArrayList<OperatorKey>> inpTargets;
@@ -202,24 +203,24 @@
             try {
                                Path path = new 
Path(inputs.get(i).first.getFileName());
                                 
-                                FileSystem fs;
+                FileSystem fs;
                                 
-                                try {
-                                    fs = path.getFileSystem(job);
-                                } catch (Exception e) {
-                                    // If an application specific
-                                    // scheme was used
-                                    // (e.g.: "hbase://table") we will fail
-                                    // getting the file system. That's
-                                    // ok, we just use the dfs in that case.
-                                    fs = new Path("/").getFileSystem(job);
-                                }
+                try {
+                   fs = path.getFileSystem(job);
+                } catch (Exception e) {
+                   // If an application specific
+                   // scheme was used
+                   // (e.g.: "hbase://table") we will fail
+                   // getting the file system. That's
+                   // ok, we just use the dfs in that case.
+                   fs = new Path("/").getFileSystem(job);
+                }
 
                                // if the execution is against Mapred DFS, set
                                // working dir to /user/<userid>
                                if(pigContext.getExecType() == 
ExecType.MAPREDUCE) {
                                    fs.setWorkingDirectory(new Path("/user", 
job.getUser()));
-                                }
+                }
                                
                                DataStorage store = new 
HDataStorage(ConfigurationUtil.toProperties(job));
                                ValidatingInputFileSpec spec;
@@ -244,6 +245,9 @@
                                throw new ExecException(msg, errCode, 
PigException.BUG, e);
                        }
         }
+        // set the number of map tasks
+        pigContext.getProperties().setProperty("pig.mapsplits.count", 
Integer.toString(splits.size()));
+        job.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
         return splits.toArray(new SliceWrapper[splits.size()]);
     }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 Thu Sep  3 00:36:45 2009
@@ -81,7 +81,7 @@
         POLoad load = (POLoad)po;
         String loadFunc = load.getLFile().getFuncName();
         String loadFile = load.getLFile().getFileName();
-        if 
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc))) {
+        if 
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc)) && 
!("org.apache.pig.impl.builtin.SkewedJoinSampleLoader".equals(loadFunc))) {
             log.debug("Not a sampling job.");
             return;
         }
@@ -178,7 +178,7 @@
         rslargs[0] = predFs.getFuncSpec().toString();
         // Second argument is the number of samples per block, read this from 
the original.
         rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1];
-        FileSpec fs = new FileSpec(predFs.getFileName(),new 
FuncSpec(RandomSampleLoader.class.getName(), rslargs));
+        FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc, 
rslargs));
         POLoad newLoad = new 
POLoad(load.getOperatorKey(),load.getRequestedParallelism(), fs, 
load.isSplittable());
         try {
             mr.mapPlan.replace(load, newLoad);

Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=810742&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
(added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java 
Thu Sep  3 00:36:45 2009
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Currently skipInterval is similar to the randomsampleloader. However, if we 
were to use an
+ * uniform distribution, we could precompute the intervals and read it from a 
file.
+ *
+ */
+public class PoissonSampleLoader extends SampleLoader {
+       
+       // Base number of samples needed
+       private long baseNumSamples;
+       
+       /// Count of the map splits
+       private static final String MAPSPLITS_COUNT = "pig.mapsplits.count";
+       
+       /// Conversion factor accounts for the various encodings, compression 
etc
+       private static final String CONV_FACTOR = 
"pig.inputfile.conversionfactor";
+       
+       /// For a given mean and a confidence, a sample rate is obtained from a 
poisson cdf
+       private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
+       
+       /// % of memory available for the input data. This is currenty equal to 
the memory available
+       /// for the skewed join
+       private static final String PERC_MEM_AVAIL = 
"pig.skewedjoin.reduce.memusage";
+       
+       // 17 is not a magic number. It can be obtained by using a poisson 
cumulative distribution function with the mean
+       // set to 10 (emperically, minimum number of samples) and the 
confidence set to 95%
+       private static final int DEFAULT_SAMPLE_RATE = 17;
+       
+       // By default the data is multiplied by 2 to account for the encoding
+       private static final int DEFAULT_CONV_FACTOR = 2;
+       
+
+       public PoissonSampleLoader(String funcSpec, String ns) {
+               super(funcSpec);
+               super.setNumSamples(Integer.valueOf(ns)); // will be overridden
+       }
+       
+       // n is the number of map tasks
+       @Override
+       public void setNumSamples(int n) {
+               super.setNumSamples(n); // will be overridden
+       }
+       
+       /**
+        * Computes the number of samples for the loader
+        * 
+        * @param inputs : Set to pig inputs
+        * @param pc : PigContext object
+        * 
+        */
+       @Override
+       public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
PigContext pc) throws ExecException {
+               int numSplits, convFactor, sampleRate;
+               Properties pcProps = pc.getProperties();
+               
+               // Set default values for the various parameters
+               try {
+                       numSplits = 
Integer.valueOf(pcProps.getProperty(MAPSPLITS_COUNT));
+               } catch (NumberFormatException e) {
+                       String msg = "Couldn't retrieve the number of maps in 
the job";
+                       throw new ExecException(msg);
+               }
+               
+               try {
+                       convFactor = 
Integer.valueOf(pcProps.getProperty(CONV_FACTOR));
+               } catch (NumberFormatException e) {
+                       convFactor = DEFAULT_CONV_FACTOR;
+               }
+               
+               try {
+                       sampleRate = 
Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
+               } catch (NumberFormatException e) {
+                       sampleRate = DEFAULT_SAMPLE_RATE;
+               }
+               
+               // % of memory available for the records
+               float heapPerc = 
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+               
+               // we are only concerned with the first input for skewed join
+               String fname = inputs.get(0).first.getFileName();
+               
+               // calculate the base number of samples
+               try {
+                       float f = (Runtime.getRuntime().maxMemory() * heapPerc) 
/ (float) (FileLocalizer.getSize(fname,pcProps) * convFactor);
+                       baseNumSamples = (long) Math.ceil(1.0 / f);
+               } catch (IOException e) {
+                       int errCode = 2175;
+                       String msg = "Internal error. Could not retrieve file 
size for the sampler.";
+                       throw new ExecException(msg, errCode, PigException.BUG);
+               } catch (ArithmeticException e) {
+                       int errCode = 1105;
+                       String msg = "Heap percentage / Conversion factor 
cannot be set to 0";
+                       throw new ExecException(msg,errCode,PigException.INPUT);
+               }
+               
+               // set the number of samples
+               int n = (int) ((baseNumSamples * sampleRate) / numSplits);
+               
+               // set the minimum number of samples to 1
+               n = (n > 1) ? n : 1;
+               setNumSamples(n);
+       }
+       
+
+}

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=810742&r1=810741&r2=810742&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java 
Thu Sep  3 00:36:45 2009
@@ -17,33 +17,13 @@
  */
 package org.apache.pig.impl.builtin;
 
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.SamplableLoader;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
 /**
  * A loader that samples the data.  This loader can subsume loader that
  * can handle starting in the middle of a record.  Loaders that can
  * handle this should implement the SamplableLoader interface.
  */
-public class RandomSampleLoader implements LoadFunc {
-    
-    private int numSamples;
-    private long skipInterval;    
-       private TupleFactory factory;
-    protected SamplableLoader loader;
-    
+public class RandomSampleLoader extends SampleLoader {
+ 
     /**
      * Construct with a class of loader to use.
      * @param funcSpec func spec of the loader to use.
@@ -53,107 +33,17 @@
      * UDF constructors.
      */
     public RandomSampleLoader(String funcSpec, String ns) {
-        loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
-        numSamples = Integer.valueOf(ns);
+       // instantiate the loader
+        super(funcSpec);
+        // set the number of samples
+        super.setNumSamples(Integer.valueOf(ns));
     }
     
-
-    public void bindTo(String fileName, BufferedPositionedInputStream is, long 
offset, long end) throws IOException {        
-       skipInterval = (end - offset)/numSamples;
-        loader.bindTo(fileName, is, offset, end);
-    }
     
-
-    public Tuple getNext() throws IOException {
-        long initialPos = loader.getPosition();
-        
-        // make sure we move to a boundry of a record
-        Tuple t = loader.getSampledTuple();        
-        long middlePos = loader.getPosition();
-        
-        // we move to next boundry
-        t = loader.getSampledTuple();        
-        long finalPos = loader.getPosition();
-        
-        long toSkip = skipInterval - (finalPos - initialPos);
-        if (toSkip > 0) {
-            long rc = loader.skip(toSkip);
-            
-            // if we did not skip enough
-            // in the first attempt, call
-            // in.skip() repeatedly till we
-            // skip enough
-            long remainingSkip = toSkip - rc;
-            while(remainingSkip > 0) {
-                rc = loader.skip(remainingSkip);
-                if(rc == 0) {
-                    // underlying stream saw EOF
-                    break;
-                }
-                remainingSkip -= rc;
-            }
-        }       
-        
-        if (t == null) {
-               return null;
-        }
-        
-        if (factory == null) {
-               factory = TupleFactory.getInstance();
-        }
-
-        // copy existing field 
-        Tuple m = factory.newTuple(t.size()+1);
-        for(int i=0; i<t.size(); i++) {
-               m.set(i, t.get(i));
-        }
-        
-        // add size of the tuple at the end
-        m.set(t.size(), (finalPos-middlePos));
-        
-        return m;
-    }
-    
-    public Integer bytesToInteger(byte[] b) throws IOException {
-        return loader.bytesToInteger(b);
-    }
-
-    public Long bytesToLong(byte[] b) throws IOException {
-        return loader.bytesToLong(b);
-    }
-
-    public Float bytesToFloat(byte[] b) throws IOException {
-        return loader.bytesToFloat(b);
-    }
-
-    public Double bytesToDouble(byte[] b) throws IOException {
-        return loader.bytesToDouble(b);
-    }
-
-    public String bytesToCharArray(byte[] b) throws IOException {
-        return loader.bytesToCharArray(b);
-    }
-
-    public Map<String, Object> bytesToMap(byte[] b) throws IOException {
-        return loader.bytesToMap(b);
-    }
-
-    public Tuple bytesToTuple(byte[] b) throws IOException {
-        return loader.bytesToTuple(b);
-    }
-
-    public DataBag bytesToBag(byte[] b) throws IOException {
-        return loader.bytesToBag(b);
-    }
-
-    public void fieldsToRead(Schema schema) {
-        loader.fieldsToRead(schema);
-    }
-
-    public Schema determineSchema(
-            String fileName,
-            ExecType execType,
-            DataStorage storage) throws IOException {
-        return loader.determineSchema(fileName, execType, storage);
+    @Override
+    public void setNumSamples(int n) {
+       // Setting it to 100 as default for order by
+       super.setNumSamples(100);
     }
+ 
 }

Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=810742&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java Thu Sep  
3 00:36:45 2009
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.SamplableLoader;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Abstract class that specifies the interface for sample loaders
+ *
+ */
+public abstract class SampleLoader implements LoadFunc {
+
+       protected int numSamples;
+       protected long skipInterval;
+    protected SamplableLoader loader;
+       private TupleFactory factory;
+
+    
+    public SampleLoader(String funcSpec) {
+       loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
+    }
+    
+    public void setNumSamples(int n) {
+       numSamples = n;
+    }
+    
+    public int getNumSamples() {
+       return numSamples;
+    }
+    
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, 
org.apache.pig.impl.io.BufferedPositionedInputStream, long, long)
+        */
+       public void bindTo(String fileName, BufferedPositionedInputStream is,
+                       long offset, long end) throws IOException {
+        skipInterval = (end - offset)/numSamples;
+        loader.bindTo(fileName, is, offset, end);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToBag(byte[])
+        */
+       public DataBag bytesToBag(byte[] b) throws IOException {
+        return loader.bytesToBag(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[])
+        */
+       public String bytesToCharArray(byte[] b) throws IOException {
+        return loader.bytesToCharArray(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToDouble(byte[])
+        */
+       public Double bytesToDouble(byte[] b) throws IOException {
+        return loader.bytesToDouble(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToFloat(byte[])
+        */
+       public Float bytesToFloat(byte[] b) throws IOException {
+        return loader.bytesToFloat(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToInteger(byte[])
+        */
+       public Integer bytesToInteger(byte[] b) throws IOException {
+               return loader.bytesToInteger(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToLong(byte[])
+        */
+       public Long bytesToLong(byte[] b) throws IOException {
+        return loader.bytesToLong(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToMap(byte[])
+        */
+       public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+        return loader.bytesToMap(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#bytesToTuple(byte[])
+        */
+       public Tuple bytesToTuple(byte[] b) throws IOException {
+        return loader.bytesToTuple(b);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, 
org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+        */
+       public Schema determineSchema(String fileName, ExecType execType,
+                       DataStorage storage) throws IOException {
+        return loader.determineSchema(fileName, execType, storage);
+       }
+
+       /* (non-Javadoc)
+        * @see 
org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
+        */
+       public void fieldsToRead(Schema schema) {
+        loader.fieldsToRead(schema);
+       }
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#getNext()
+        */
+       public Tuple getNext() throws IOException {
+       long initialPos = loader.getPosition();
+        
+        // make sure we move to a boundry of a record
+        Tuple t = loader.getSampledTuple();        
+        long middlePos = loader.getPosition();
+        
+        // we move to next boundry
+        t = loader.getSampledTuple();        
+        long finalPos = loader.getPosition();
+        
+        long toSkip = skipInterval - (finalPos - initialPos);
+        if (toSkip > 0) {
+            long rc = loader.skip(toSkip);
+            
+            // if we did not skip enough
+            // in the first attempt, call
+            // in.skip() repeatedly till we
+            // skip enough
+            long remainingSkip = toSkip - rc;
+            while(remainingSkip > 0) {
+                rc = loader.skip(remainingSkip);
+                if(rc == 0) {
+                    // underlying stream saw EOF
+                    break;
+                }
+                remainingSkip -= rc;
+            }
+        }       
+        
+        if (t == null) {
+               return null;
+        }
+        
+               if (factory == null) {
+               factory = TupleFactory.getInstance();
+        }
+
+        // copy existing field 
+        Tuple m = factory.newTuple(t.size()+1);
+        for(int i=0; i<t.size(); i++) {
+               m.set(i, t.get(i));
+        }
+        
+        // add size of the tuple at the end
+        m.set(t.size(), (finalPos-middlePos));
+        
+        return m;              
+       }
+
+       public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
PigContext pc) throws ExecException {
+               // TODO Auto-generated method stub
+               
+       }
+
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java?rev=810742&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPoissonSampleLoader.java Thu 
Sep  3 00:36:45 2009
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.ArrayList;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.apache.pig.impl.io.FileSpec;
+
+
+public class TestPoissonSampleLoader extends TestCase{
+    private static final String INPUT_FILE1 = "SkewedJoinInput1.txt";
+
+    private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    
+    public TestPoissonSampleLoader() throws ExecException, IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple",
 "5");     
+        
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage",
 "0.0001");
+        
pigServer.getPigContext().getProperties().setProperty("mapred.child.java.opts", 
"-Xmx512m");
+
+        
pigServer.getPigContext().getProperties().setProperty("pig.mapsplits.count", 
"5");
+    }
+    
+    
+    @Before
+    public void setUp() throws Exception {
+        createFiles();
+    }
+
+    private void createFiles() throws IOException {
+       PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE1));
+               
+       int k = 0;
+       for(int j=0; j<100; j++) {                              
+               w.println("100\tapple1\taaa" + k);
+           k++;
+           w.println("200\torange1\tbbb" + k);
+           k++;
+           w.println("300\tstrawberry\tccc" + k);
+           k++;                            
+       }
+       
+       w.close();
+       
+       Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
+    }
+    
+    
+    @After
+    public void tearDown() throws Exception {
+       new File(INPUT_FILE1).delete();
+       
+        Util.deleteFile(cluster, INPUT_FILE1);
+    }
+    
+    
+    public void testComputeSamples() throws IOException{
+               FileSpec fs = new FileSpec(INPUT_FILE1, new 
FuncSpec(PigStorage.class.getName()));
+               
+               ArrayList<Pair<FileSpec, Boolean>> inputs = new 
ArrayList<Pair<FileSpec, Boolean> >();
+               inputs.add(new Pair<FileSpec, Boolean>(fs, true));
+               
+        // Use 100 as a default value;
+        PoissonSampleLoader ps = new PoissonSampleLoader((new 
FuncSpec(PigStorage.class.getName())).toString(), "100");
+
+        // Get the number of samples for the file
+        ps.computeSamples(inputs, pigServer.getPigContext());
+        
+        if (ps.getNumSamples() != 3) {
+               fail("Compute samples returned the wrong number of samples");
+        }
+    }
+       
+}
\ No newline at end of file


Reply via email to