Author: gates
Date: Tue Jun 30 01:10:29 2009
New Revision: 789523

URL: http://svn.apache.org/viewvc?rev=789523&view=rev
Log:
PIG-820: Change RandomSampleLoader to take a LoadFunc instead of extending 
BinStorage.  Added new Samplable interface for loaders to implement allowing 
them to be used by RandomSampleLoader.


Added:
    hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 30 01:10:29 2009
@@ -26,6 +26,10 @@
 
 IMPROVEMENTS
 
+PIG-820: Change RandomSampleLoader to take a LoadFunc instead of extending
+               BinStorage.  Added new Samplable interface for loaders to 
implement
+               allowing them to be used by RandomSampleLoader (ashutoshc via 
gates).
+
 PIG-832: Make import list configurable (daijy)
 
 PIG-697: Proposed improvements to pig's optimizer (sms)

Added: hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java?rev=789523&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/SamplableLoader.java Tue Jun 30 
01:10:29 2009
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+import java.io.IOException;
+
+/**
+ * Implementing this interface indicates to Pig that a given loader can be 
+ * used by a sampling loader.  The requirement for this is that the loader
+ * can handle a getNext() call without knowing the position in the file.
+ * This will not be the case for loaders that handle structured data such
+ * as XML where they must start at the beginning of the file in order to 
+ * understand their position.  Record oriented loaders such as PigStorage
+ * can handle this by seeking to the next record delimiter and starting
+ * from that point.  Another requirement is that the loader be able to 
+ * skip or seek in its input stream.
+ */
+public interface SamplableLoader extends LoadFunc {
+    
+    /**
+     * Skip ahead in the input stream.
+     * @param n number of bytes to skip
+     * @return number of bytes actually skipped.  The return semantics are
+     * exactly the same as {...@link java.io.InpuStream#skip(long)}
+     */
+    public long skip(long n) throws IOException;
+    
+    /**
+     * Get the current position in the stream.
+     * @return position in the stream.
+     */
+    public long getPosition() throws IOException;
+}

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Tue Jun 30 01:10:29 2009
@@ -507,6 +507,13 @@
     }
     
     
+    /**
+     * Force an end to the current map reduce job with a store into a temporary
+     * file.
+     * @param fSpec Temp file to force a store into.
+     * @return MR operator that now is finished with a store.
+     * @throws PlanException
+     */
     private MapReduceOper endSingleInputPlanWithStr(FileSpec fSpec) throws 
PlanException{
         if(compiledInputs.length>1) {
             int errCode = 2023;
@@ -1231,8 +1238,20 @@
         return mro;
     }
 
-    public Pair<MapReduceOper,Integer> getQuantileJob(POSort inpSort, 
MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, 
Pair<Integer,Byte>[] fields) throws PlanException, VisitorException {
-        FileSpec quantLdFilName = new FileSpec(lFile.getFileName(), new 
FuncSpec(RandomSampleLoader.class.getName()));
+    public Pair<MapReduceOper,Integer> getQuantileJob(
+            POSort inpSort,
+            MapReduceOper prevJob,
+            FileSpec lFile,
+            FileSpec quantFile,
+            int rp,
+            Pair<Integer,Byte>[] fields) throws PlanException, 
VisitorException {
+        String[] rslargs = new String[2];
+        // RandomSampleLoader expects string version of FuncSpec 
+        // as its first constructor argument.
+        rslargs[0] = (new FuncSpec(BinStorage.class.getName())).toString();
+        rslargs[1] = "100"; // TODO Needs to be determined based on file size.
+        FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
+            new FuncSpec(RandomSampleLoader.class.getName(), rslargs));
         MapReduceOper mro = startNew(quantLdFilName, prevJob);
         POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
                 .getRequestedParallelism(), null, inpSort.getSortPlans(),

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Tue Jun 30 01:10:29 2009
@@ -315,6 +315,11 @@
             co.getMessageCollector().logMessages(MessageType.Warning, 
aggregateWarning, log);
         }
         
+        // Optimize the jobs that have a load/store only first MR job followed
+        // by a sample job.
+        SampleOptimizer so = new SampleOptimizer(plan);
+        so.visit();
+        
         // optimize key - value handling in package
         POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
         pkgAnnotator.visit();

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=789523&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
 Tue Jun 30 01:10:29 2009
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.SamplableLoader;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to optimize plans that have a sample job that immediately follows 
a
+ * load/store only MR job.  These kinds of plans are generated for order bys, 
and
+ * will soon be generated for joins that need to sample their data first.  
These
+ * can be changed so that the RandomSampleLoader subsumes the loader used in 
the
+ * first job and then removes the first job.
+ */
+public class SampleOptimizer extends MROpPlanVisitor {
+
+    private Log log = LogFactory.getLog(getClass());
+
+    public SampleOptimizer(MROperPlan plan) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+    }
+
+    private List<MapReduceOper> opsToRemove = new ArrayList<MapReduceOper>();
+
+    @Override
+    public void visit() throws VisitorException {
+        
+        super.visit();
+        // remove operators marked for removal
+        for (MapReduceOper op : opsToRemove) 
+            this.mPlan.remove(op);
+    }
+
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        // See if this is a sampling job.
+        List<PhysicalOperator> pos = mr.mapPlan.getRoots();
+        if (pos == null || pos.size() == 0) {
+            log.debug("Map of operator empty");
+            return;
+        }
+        PhysicalOperator po = pos.get(0);
+        if (!(po instanceof POLoad)) {
+            log.debug("Root operator of map is not load.");
+            return; // Huh?
+        }
+        POLoad load = (POLoad)po;
+        String loadFunc = load.getLFile().getFuncName();
+        String loadFile = load.getLFile().getFileName();
+        if 
(!("org.apache.pig.impl.builtin.RandomSampleLoader".equals(loadFunc))) {
+            log.debug("Not a sampling job.");
+            return;
+        }
+        if (loadFile == null) {
+            log.debug("No load file");
+            return;
+        }
+
+        // Get this job's predecessor.  There should be exactly one.;
+        List<MapReduceOper> preds = mPlan.getPredecessors(mr);
+        if (preds.size() != 1) {
+            log.debug("Too many predecessors to sampling job.");
+            return;
+        }
+        MapReduceOper pred = preds.get(0);
+
+        // The predecessor should be a root.
+        List<MapReduceOper> predPreds = mPlan.getPredecessors(pred);
+        if (predPreds != null && predPreds.size() > 0) {
+            log.debug("Predecessor should be a root of the plan");
+            return; 
+        }
+
+        // The predecessor should have just a load and store in the map, and 
nothing
+        // in the combine or reduce.
+        if ( !(pred.reducePlan.isEmpty() && pred.combinePlan.isEmpty())) {
+            log.debug("Predecessor has a combine or reduce plan");
+            return;
+        }
+
+        if (pred.mapPlan == null || pred.mapPlan.size() != 2) {
+            log.debug("Predecessor has more than just load+store in the map");
+            return;
+        }
+
+        List<PhysicalOperator> loads = pred.mapPlan.getRoots();
+        if (loads.size() != 1) {
+            log.debug("Predecessor plan has more than one root.");
+            return;
+        }
+        PhysicalOperator r = loads.get(0);
+        if (!(r instanceof POLoad)) { // Huh?
+            log.debug("Predecessor's map plan root is not a load.");
+            return;
+        }
+        POLoad predLoad = (POLoad)r;
+        LoadFunc lf = 
(LoadFunc)PigContext.instantiateFuncFromSpec(predLoad.getLFile().getFuncSpec());
+        if (!(lf instanceof SamplableLoader)) {
+            log.debug("Predecessor's loader does not implement 
SamplableLoader");
+            return;
+        }
+
+        // The MR job should have one successor.
+        List<MapReduceOper> succs = mPlan.getSuccessors(mr);
+        if (succs.size() != 1) {
+            log.debug("Job has more than one successor.");
+            return;
+        }
+        MapReduceOper succ = succs.get(0);
+
+        // Find the load the correlates with the file the sampler is loading, 
and
+        // check that it is using BinaryStorage.
+        if (succ.mapPlan == null) { // Huh?
+            log.debug("Successor has no map plan.");
+            return;
+        }
+        loads = succ.mapPlan.getRoots();
+        POLoad succLoad = null;
+        for (PhysicalOperator root : loads) {
+            if (!(root instanceof POLoad)) { // Huh?
+                log.debug("Successor's roots are not loads");
+                return;
+            }
+            POLoad sl = (POLoad)root;
+            if (loadFile.equals(sl.getLFile().getFileName()) && 
+                    
"org.apache.pig.builtin.BinStorage".equals(sl.getLFile().getFuncName())) {
+                succLoad = sl;
+                break;
+            }
+        }
+
+        if (succLoad == null) {
+            log.debug("Could not find load that matched file we are 
sampling.");
+            return;
+        }
+
+        // Okay, we're on.
+        // First, replace our RandomSampleLoader with a RandomSampleLoader 
that uses
+        // the load function from our predecessor.
+        String[] rslargs = new String[2];
+        FileSpec predFs = predLoad.getLFile();
+        // First argument is FuncSpec of loader function to subsume, this we 
want to set for
+        // ourselves.
+        rslargs[0] = predFs.getFuncSpec().toString();
+        // Second argument is the number of samples per block, read this from 
the original.
+        rslargs[1] = load.getLFile().getFuncSpec().getCtorArgs()[1];
+        FileSpec fs = new FileSpec(predFs.getFileName(),new 
FuncSpec(RandomSampleLoader.class.getName(), rslargs));
+        POLoad newLoad = new 
POLoad(load.getOperatorKey(),load.getRequestedParallelism(), fs, 
load.isSplittable());
+        try {
+            mr.mapPlan.replace(load, newLoad);
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+
+        // Second, replace the loader in our successor with whatever the 
originally used loader was.
+        fs = new FileSpec(predFs.getFileName(), predFs.getFuncSpec());
+        newLoad = new POLoad(succLoad.getOperatorKey(), 
succLoad.getRequestedParallelism(), fs, succLoad.isSplittable());
+        try {
+            succ.mapPlan.replace(succLoad, newLoad);
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+
+        // Cannot delete the pred right now, because we are still traversing 
the graph. So, mark the pred and remove it from the
+        // plan once the visit by this optimizer is complete.
+        opsToRemove.add(pred);
+    }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Jun 30 
01:10:29 2009
@@ -25,7 +25,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.URL;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -35,8 +34,8 @@
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.SamplableLoader;
 import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataReaderWriter;
@@ -44,14 +43,10 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.util.WrappedIOException;
 
-
-public class BinStorage implements ReversibleLoadStoreFunc {
+public class BinStorage implements ReversibleLoadStoreFunc, SamplableLoader {
     public static final byte RECORD_1 = 0x01;
     public static final byte RECORD_2 = 0x02;
     public static final byte RECORD_3 = 0x03;
@@ -68,6 +63,16 @@
     public BinStorage() {
     }
 
+    @Override
+    public long getPosition() throws IOException {
+        return in.getPosition();
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        return in.skip(n);
+    }
+    
     public Tuple getNext() throws IOException {
         
         byte b = 0;

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Jun 30 
01:10:29 2009
@@ -20,9 +20,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Iterator;
 
@@ -30,9 +28,8 @@
 import org.apache.commons.logging.Log;
 
 import org.apache.pig.ExecType;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
-import org.apache.pig.StoreFunc;
+import org.apache.pig.SamplableLoader;
 import org.apache.pig.ReversibleLoadStoreFunc;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -40,7 +37,6 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
@@ -50,7 +46,7 @@
  * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for 
more information.
  */
 public class PigStorage extends Utf8StorageConverter
-        implements ReversibleLoadStoreFunc {
+        implements ReversibleLoadStoreFunc, SamplableLoader {
     protected BufferedPositionedInputStream in = null;
     protected final Log mLog = LogFactory.getLog(getClass());
         
@@ -101,6 +97,16 @@
         }
     }
 
+    @Override
+    public long getPosition() throws IOException {
+        return in.getPosition();
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        return in.skip(n);
+    }
+
     public Tuple getNext() throws IOException {
         if (in == null || in.getPosition() > end) {
             return null;
@@ -336,5 +342,4 @@
         return null;
     }
 
-
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/Tuple.java Tue Jun 30 01:10:29 2009
@@ -36,6 +36,10 @@
  *
  * Fields are numbered from 0.
  */
+
+// Put in to make the compiler not complain about WritableComparable
+// being a generic type.
+...@suppresswarnings("unchecked")
 public interface Tuple extends WritableComparable, Serializable {
        
     /**

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java 
Tue Jun 30 01:10:29 2009
@@ -18,37 +18,58 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.util.Map;
 
-import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.SamplableLoader;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 
-public class RandomSampleLoader extends BinStorage {
+/**
+ * A loader that samples the data.  This loader can subsume loader that
+ * can handle starting in the middle of a record.  Loaders that can
+ * handle this should implement the SamplableLoader interface.
+ */
+public class RandomSampleLoader implements LoadFunc {
     
-    public static int defaultNumSamples = 100;
-    int numSamples = defaultNumSamples;
+    private int numSamples;
     private long skipInterval;
+    private SamplableLoader loader;
+    
+    /**
+     * Construct with a class of loader to use.
+     * @param funcSpec func spec of the loader to use.
+     * @param ns Number of samples per map to collect. 
+     * Arguments are passed as strings instead of actual types (FuncSpec, int) 
+     * because FuncSpec only supports string arguments to
+     * UDF constructors.
+     */
+    public RandomSampleLoader(String funcSpec, String ns) {
+        loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
+        numSamples = Integer.valueOf(ns);
+    }
     
     @Override
     public void bindTo(String fileName, BufferedPositionedInputStream is, long 
offset, long end) throws IOException {
         skipInterval = (end - offset)/numSamples;
-        super.bindTo(fileName, is, offset, end);
+        loader.bindTo(fileName, is, offset, end);
     }
     
     @Override
     public Tuple getNext() throws IOException {
-        long initialPos = in.getPosition();
-        Tuple t = super.getNext();
-        long finalPos = in.getPosition();
+        long initialPos = loader.getPosition();
+        Tuple t = loader.getNext();
+        long finalPos = loader.getPosition();
         
         long toSkip = skipInterval - (finalPos - initialPos);
         if (toSkip > 0) {
-            long rc = in.skip(toSkip);
+            long rc = loader.skip(toSkip);
             
             // if we did not skip enough
             // in the first attempt, call
@@ -56,7 +77,7 @@
             // skip enough
             long remainingSkip = toSkip - rc;
             while(remainingSkip > 0) {
-                rc = in.skip(remainingSkip);
+                rc = loader.skip(remainingSkip);
                 if(rc == 0) {
                     // underlying stream saw EOF
                     break;
@@ -67,13 +88,46 @@
         return t;
     }
     
-    @Override
-    public void bindTo(OutputStream os) throws IOException {
-        int errCode = 2101;
-        String msg = this.getClass().getName() + " should not be used for 
storing.";
-        throw new ExecException(msg, errCode, PigException.BUG);
+    public Integer bytesToInteger(byte[] b) throws IOException {
+        return loader.bytesToInteger(b);
     }
-    
-    
-    
-}
+
+    public Long bytesToLong(byte[] b) throws IOException {
+        return loader.bytesToLong(b);
+    }
+
+    public Float bytesToFloat(byte[] b) throws IOException {
+        return loader.bytesToFloat(b);
+    }
+
+    public Double bytesToDouble(byte[] b) throws IOException {
+        return loader.bytesToDouble(b);
+    }
+
+    public String bytesToCharArray(byte[] b) throws IOException {
+        return loader.bytesToCharArray(b);
+    }
+
+    public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+        return loader.bytesToMap(b);
+    }
+
+    public Tuple bytesToTuple(byte[] b) throws IOException {
+        return loader.bytesToTuple(b);
+    }
+
+    public DataBag bytesToBag(byte[] b) throws IOException {
+        return loader.bytesToBag(b);
+    }
+
+    public void fieldsToRead(Schema schema) {
+        loader.fieldsToRead(schema);
+    }
+
+    public Schema determineSchema(
+            String fileName,
+            ExecType execType,
+            DataStorage storage) throws IOException {
+        return loader.determineSchema(fileName, execType, storage);
+    }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java?rev=789523&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSampleOptimizer.java Tue Jun 
30 01:10:29 2009
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SampleOptimizer;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Test;
+
+public class TestSampleOptimizer {
+
+    static PigContext pc;
+    static{
+        pc = new 
PigContext(ExecType.MAPREDUCE,MiniCluster.buildCluster().getProperties());
+        try {
+            pc.connect();
+        } catch (ExecException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testOptimizerFired() throws Exception{
+
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan(" A = load 'input' using PigStorage('\t');");
+        planTester.buildPlan(" B = order A by $0;");
+        LogicalPlan lp = planTester.buildPlan("store B into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        int count = 1;
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }
+        
+        // Before optimizer visits, number of MR jobs = 3.
+        assertEquals(3,count);   
+
+        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        so.visit();
+
+        count = 1;
+        mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }
+        
+        // After optimizer visits, number of MR jobs = 2.
+        assertEquals(2,count);
+
+        // Test if RandomSampleLoader got pushed to top.
+        mrOper = mrPlan.getRoots().get(0);
+        List<PhysicalOperator> phyOps = mrOper.mapPlan.getRoots();
+        assertEquals(1, phyOps.size());
+        assertTrue(phyOps.get(0) instanceof POLoad);
+        
assertTrue(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+
+        // Test RandomSampleLoader is not present anymore in second MR job.
+        phyOps = mrPlan.getSuccessors(mrOper).get(0).mapPlan.getRoots();
+        assertEquals(1, phyOps.size());
+        assertTrue(phyOps.get(0) instanceof POLoad);
+        
assertFalse(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+    }
+
+    @Test
+    public void testOptimizerNotFired() throws Exception{
+
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan(" A = load 'input' using PigStorage('\t');");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan(" C = order B by $0;");
+        LogicalPlan lp = planTester.buildPlan("store C into 'output';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        int count = 1;
+        MapReduceOper mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }        
+        // Before optimizer visits, number of MR jobs = 3.
+        assertEquals(3,count);
+
+        SampleOptimizer so = new SampleOptimizer(mrPlan);
+        so.visit();
+
+        count = 1;
+        mrOper = mrPlan.getRoots().get(0);
+        while(mrPlan.getSuccessors(mrOper) != null) {
+            mrOper = mrPlan.getSuccessors(mrOper).get(0);
+            ++count;
+        }        
+        
+        // After optimizer visits, number of MR jobs = 3. Since here
+        // optimizer is not fired.
+        assertEquals(3,count);
+
+        // Test Sampler is not moved and is present in 2nd MR job.
+        mrOper = mrPlan.getRoots().get(0);
+        List<PhysicalOperator> phyOps = mrOper.mapPlan.getRoots();
+        assertEquals(1, phyOps.size());
+        assertTrue(phyOps.get(0) instanceof POLoad);
+        
assertFalse(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+
+        phyOps = mrPlan.getSuccessors(mrOper).get(0).mapPlan.getRoots();
+        assertEquals(1, phyOps.size());
+        assertTrue(phyOps.get(0) instanceof POLoad);
+        
assertTrue(((POLoad)phyOps.get(0)).getLFile().getFuncName().equals("org.apache.pig.impl.builtin.RandomSampleLoader"));
+    }
+
+
+    // End to end is more comprehensively tested in TestOrderBy and 
TestOrderBy2. But since those tests are currently excluded
+    // this simple end to end test is included.
+    @Test
+    public void testEndToEnd() throws Exception{
+
+        PigServer pigServer = new PigServer(pc);
+        int LOOP_COUNT = 40;
+        File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        Random r = new Random(3);
+        int rand;
+        for(int i = 0; i < LOOP_COUNT; i++) {
+            rand = r.nextInt(100);
+            ps.println(rand);
+        }
+        ps.close();
+
+        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile.toString()) + "' using PigStorage() AS (num:int);");
+        pigServer.registerQuery("B = order A by num desc;");
+        Iterator<Tuple> result = pigServer.openIterator("B");
+
+        Integer prevNum = null;
+        while (result.hasNext())
+        {
+            Integer curNum  = (Integer)result.next().get(0);
+            if (null != prevNum) 
+                assertTrue(curNum.compareTo(prevNum) <= 0 );
+
+            prevNum = curNum;
+        }
+        tmpFile.delete();
+    }
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=789523&r1=789522&r2=789523&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Tue 
Jun 30 01:10:29 2009
@@ -78,7 +78,7 @@
             |       |   |
             |       |   Project[tuple][*] - -131
             |       |
-            |       
|---Load(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.impl.builtin.RandomSampleLoader)
 - -129
+            |       
|---Load(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.impl.builtin.RandomSampleLoader('org.apache.pig.builtin.BinStorage','100'))
 - -129
             |
             |---MapReduce(-1) - -127:
                 Reduce Plan Empty


Reply via email to