Author: pradeepkth
Date: Thu Nov 19 00:58:17 2009
New Revision: 882021

URL: http://svn.apache.org/viewvc?rev=882021&view=rev
Log:
PIG-966: load-store-redesign branch: change SampleLoader and subclasses to work 
with new LoadFunc interface (thejas via pradeepkth)

Added:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
Modified:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Thu Nov 19 00:58:17 2009
@@ -44,7 +44,7 @@
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.builtin.MergeJoinIndexer;
-import org.apache.pig.impl.builtin.TupleSize;
+import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -1387,6 +1387,7 @@
                                throw new VisitorException("POSkewedJoin 
operator has " + compiledInputs.length + " inputs. It should have 2.");
                        }
                        
+                       //change plan to store the first join input into a temp 
file
                        FileSpec fSpec = getTempFileSpec();
                        MapReduceOper mro = compiledInputs[0];
                        POStore str = getStore();
@@ -1460,7 +1461,10 @@
                        }                     
                        
                        // run POPartitionRearrange for second join table
-                       lr = new POPartitionRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)), rp);            
+                       POPartitionRearrange pr = 
+                           new POPartitionRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+                       pr.setPigContext(pigContext);
+                       lr = pr;
                        try {
                                lr.setIndex(1);
                        } catch (ExecException e) {
@@ -1817,7 +1821,7 @@
         
        PhysicalPlan ep = new PhysicalPlan();
        POUserFunc uf = new POUserFunc(new 
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
-                   new FuncSpec(TupleSize.class.getName(), (String[])null));
+                   new FuncSpec(GetMemNumRows.class.getName(), 
(String[])null));
        uf.setResultType(DataType.TUPLE);
        ep.add(uf);     
        ep.add(prjStar);

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 Thu Nov 19 00:58:17 2009
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 
 
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.builtin.BinStorage;
@@ -32,6 +33,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -66,14 +68,15 @@
      * 
      */
     private static final long serialVersionUID = 1L;
-       private String partitionFile;
-       private Integer totalReducers = -1;
-       // ReducerMap will store the tuple, max reducer index & min reducer 
index
-       private static Map<Object, Pair<Integer, Integer> > reducerMap = new 
HashMap<Object, Pair<Integer, Integer> >();
-       private boolean loaded;
-
-       protected static final BagFactory mBagFactory = 
BagFactory.getInstance();
-
+    private String partitionFile;
+    private Integer totalReducers = -1;
+    // ReducerMap will store the tuple, max reducer index & min reducer index
+    private static Map<Object, Pair<Integer, Integer> > reducerMap = new 
HashMap<Object, Pair<Integer, Integer> >();
+    private boolean loaded;
+
+    protected static final BagFactory mBagFactory = BagFactory.getInstance();
+    private PigContext pigContext;
+    
     public POPartitionRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -102,17 +105,22 @@
                partitionFile = file;
        }
 
-       /* Loads the key distribution file obtained from the sampler */
-       private void loadPartitionFile() throws RuntimeException {
-               try {
-                       Integer [] redCnt = new Integer[1]; 
-                       reducerMap = 
MapRedUtil.loadPartitionFile(partitionFile, redCnt, null, DataType.NULL);
-                       totalReducers = redCnt[0];
-                       loaded = true;
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
-       }
+    /* Loads the key distribution file obtained from the sampler */
+    private void loadPartitionFile() throws RuntimeException {
+        try {
+            Integer [] redCnt = new Integer[1]; 
+            
+            reducerMap = MapRedUtil.loadPartitionFile(partitionFile, 
+                    redCnt, 
+                    
ConfigurationUtil.toConfiguration(pigContext.getProperties()),
+                    DataType.NULL
+            );
+            totalReducers = redCnt[0];
+            loaded = true;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     @Override
     public String name() {
@@ -234,6 +242,20 @@
     }
 
     /**
+     * @param pigContext the pigContext to set
+     */
+    public void setPigContext(PigContext pigContext) {
+        this.pigContext = pigContext;
+    }
+
+    /**
+     * @return the pigContext
+     */
+    public PigContext getPigContext() {
+        return pigContext;
+    }
+
+    /**
      * Make a deep copy of this operator.  
      * @throws CloneNotSupportedException
      */

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java?rev=882021&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
 (added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GetMemNumRows.java
 Thu Nov 19 00:58:17 2009
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * UDF to get memory size of a tuple and extracts number of rows value from
+ * special tuple created by PoissonSampleLoader
+ * It is used by skewed join.
+ * 
+ */
+
+public class GetMemNumRows extends EvalFunc<Tuple>{          
+
+    private TupleFactory factory;
+       
+    public GetMemNumRows() {
+       factory = TupleFactory.getInstance();
+    }      
+
+    /**
+     * @param  in - input tuple
+     * @return - tuple having size in memory of this tuple and numRows if this
+     * is specially marked tuple having number of rows field 
+     */    
+    public Tuple exec(Tuple in) throws IOException {
+       if (in == null) {
+           return null;
+       }
+       long memSize = in.getMemorySize();
+       long numRows = 0;
+
+       
+       //  if this is specially marked tuple, get the number of rows
+        int tSize = in.size();
+       if(tSize >=2 &&
+               
in.get(tSize-2).equals(PoissonSampleLoader.NUMROWS_TUPLE_MARKER)){
+           numRows = (Long)in.get(tSize-1);
+       }
+       
+       //create tuple to be returned
+       Tuple t = factory.newTuple(2);
+       t.set(0, memSize);
+       t.set(1, numRows);
+       return t;
+    }
+    
+    public Type getReturnType(){
+        return Tuple.class;
+    }       
+}

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
 Thu Nov 19 00:58:17 2009
@@ -33,7 +33,6 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.util.Pair;
 
 /**
@@ -82,8 +81,6 @@
 
        private String inputFile_;
 
-       private long inputFileSize_;
-
        private long totalSampleCount_;
 
        private double heapPercentage_;
@@ -100,7 +97,6 @@
        public PartitionSkewedKeys(String[] args) {
                totalReducers_ = -1;
                currentIndex_ = 0;
-               inputFileSize_ = -1;
 
                if (args != null && args.length > 0) {
                        heapPercentage_ = Double.parseDouble(args[0]);
@@ -123,187 +119,177 @@
         * first field in the input tuple is the number of reducers
         * 
         * second field is the *sorted* bag of samples
+        * this should be called only once
         */
        public Map<String, Object> exec(Tuple in) throws IOException {
-               // get size of input file in bytes
-               if (inputFileSize_ == -1) {
-                       try {
-                               inputFileSize_ = 
FileLocalizer.getSize(inputFile_);
-                       } catch (Exception e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-
-               Map<String, Object> output = new HashMap<String, Object>();
-
-               if (in == null || in.size() == 0) {                     
-                       return null;
-               }
-
-               totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * 
heapPercentage_);
-               log.info("Maximum of available memory is " + totalMemory_);
-
-               ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
-
-               Tuple currentTuple = null;
-               long count = 0;
-               long totalMSize = 0;
-               long totalDSize = 0;
-               try {
-                       totalReducers_ = (Integer) in.get(0);
-                       DataBag samples = (DataBag) in.get(1);
-
-                       totalSampleCount_ = samples.size();
-                       
-                       log.info("inputFileSize: " + inputFileSize_);
-                       log.info("totalSample: " + totalSampleCount_);
-                       log.info("totalReducers: " + totalReducers_);           
        
-
-                       int maxReducers = 0;
-                       Iterator<Tuple> iter = samples.iterator();
-                       while (iter.hasNext()) {
-                               Tuple t = iter.next();
-                               if (hasSameKey(currentTuple, t) || currentTuple 
== null) {
-                                       count++;
-                                       totalMSize += getMemorySize(t);
-                                       totalDSize += getDiskSize(t);
-                               } else {
-                                       Pair<Tuple, Integer> p = 
calculateReducers(currentTuple,
-                                                       count, totalMSize, 
totalDSize);
-                                       Tuple rt = p.first;
-                                       if (rt != null) {
-                                               reducerList.add(rt);
-                                       }
-                                       if (maxReducers < p.second) {
-                                               maxReducers = p.second;
-                                       }
-                                       count = 1;
-                                       totalMSize = getMemorySize(t);
-                                       totalDSize = getDiskSize(t);
-                               }
-
-                               currentTuple = t;
-                       }
-
-                       // add last key
-                       if (count > 0) {
-                               Pair<Tuple, Integer> p = 
calculateReducers(currentTuple, count,
-                                               totalMSize, totalDSize);
-                               Tuple rt = p.first;
-                               if (rt != null) {
-                                       reducerList.add(rt);
-                               }
-                               if (maxReducers < p.second) {
-                                       maxReducers = p.second;
-                               }
-                       }
-
-                       if (maxReducers > totalReducers_) {
-                               if(pigLogger != null) {
-                    pigLogger.warn(this,"You need at least " + maxReducers
-                               + " reducers to avoid spillage and run this job 
efficiently.", PigWarning.REDUCER_COUNT_LOW);
-                } else {
-                               log.warn("You need at least " + maxReducers
-                                               + " reducers to avoid spillage 
and run this job efficiently.");
-                }
-                       }
-
-                       output.put(PARTITION_LIST, 
mBagFactory.newDefaultBag(reducerList));
-                       output.put(TOTAL_REDUCERS, 
Integer.valueOf(totalReducers_));
-                       
-                       log.info(output.toString());
-                       if (log.isDebugEnabled()) {
-                               log.debug(output.toString());
-                       }
-
-                       return output;
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       throw new RuntimeException(e);
-               }
+           if (in == null || in.size() == 0) {                     
+               return null;
+           }
+           Map<String, Object> output = new HashMap<String, Object>();
+
+           totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * 
heapPercentage_);
+           log.info("Maximum of available memory is " + totalMemory_);
+
+           ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+           Tuple currentTuple = null;
+           long count = 0;
+           
+           // total size in memory for tuples in sample 
+           long totalSampleMSize = 0;
+           
+           //total input rows for the join
+           long totalInputRows = 0;
+           
+           try {
+               totalReducers_ = (Integer) in.get(0);
+               DataBag samples = (DataBag) in.get(1);
+
+               totalSampleCount_ = samples.size();
+
+               log.info("totalSample: " + totalSampleCount_);
+               log.info("totalReducers: " + totalReducers_);                   
+
+               int maxReducers = 0;
+
+               // first iterate the samples to find total number of rows
+               Iterator<Tuple> iter1 = samples.iterator();
+               while (iter1.hasNext()) {
+                   Tuple t = iter1.next();
+                   totalInputRows += (Long)t.get(t.size() - 1);
+               }
+                               
+               // now iterate samples to do the reducer calculation
+               Iterator<Tuple> iter2 = samples.iterator();
+               while (iter2.hasNext()) {
+                   Tuple t = iter2.next();
+                   if (hasSameKey(currentTuple, t) || currentTuple == null) {
+                       count++;
+                       totalSampleMSize += getMemorySize(t);
+                   } else {
+                       Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+                               count, totalSampleMSize, totalInputRows);
+                       Tuple rt = p.first;
+                       if (rt != null) {
+                           reducerList.add(rt);
+                       }
+                       if (maxReducers < p.second) {
+                           maxReducers = p.second;
+                       }
+                       count = 1;
+                       totalSampleMSize = getMemorySize(t);
+                   }
+
+                   currentTuple = t;
+               }
+
+               // add last key
+               if (count > 0) {
+                   Pair<Tuple, Integer> p = calculateReducers(currentTuple, 
count,
+                           totalSampleMSize, totalInputRows);
+                   Tuple rt = p.first;
+                   if (rt != null) {
+                       reducerList.add(rt);
+                   }
+                   if (maxReducers < p.second) {
+                       maxReducers = p.second;
+                   }
+               }
+
+               if (maxReducers > totalReducers_) {
+                   if(pigLogger != null) {
+                       pigLogger.warn(this,"You need at least " + maxReducers
+                               + " reducers to avoid spillage and run this job 
efficiently.", PigWarning.REDUCER_COUNT_LOW);
+                   } else {
+                       log.warn("You need at least " + maxReducers
+                               + " reducers to avoid spillage and run this job 
efficiently.");
+                   }
+               }
+
+               output.put(PARTITION_LIST, 
mBagFactory.newDefaultBag(reducerList));
+               output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+               log.info(output.toString());
+               if (log.isDebugEnabled()) {
+                   log.debug(output.toString());
+               }
+
+               return output;
+           } catch (Exception e) {
+               e.printStackTrace();
+               throw new RuntimeException(e);
+           }
        }
 
        private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
-                       long count, long totalMSize, long totalDSize) {
-               // get average memory size per tuple
-               double avgM = totalMSize / (double) count;
-               // get average disk size per tuple
-               double avgD = totalDSize / (double) count;
-
-               // get the number of tuples that can fit into memory
-               long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / 
avgM): tupleMCount_;
-
-               // get the number of total tuples for this key
-               long tupleCount = (long) (((double) count) / totalSampleCount_
-                               * inputFileSize_ / avgD);       
-
-
-               int redCount = (int) Math.round(Math.ceil((double) tupleCount
-                               / tupleMCount));
-
-               if (log.isDebugEnabled()) 
-               {
-                       log.debug("avgM: " + avgM);
-                       log.debug("avgD: " + avgD);
-                       log.debug("count: " + count);
-                       log.debug("A reducer can take " + tupleMCount + " 
tuples and "
-                                       + tupleCount + " tuples are find for " 
+ currentTuple);
-                       log.debug("key " + currentTuple + " need " + redCount + 
" reducers");
-               }
+               long count, long totalMSize, long totalTuples) {
+           // get average memory size per tuple
+           double avgM = totalMSize / (double) count;
+
+           // get the number of tuples that can fit into memory
+           long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / 
avgM): tupleMCount_;
+
+           // estimate the number of total tuples for this key
+            long keyTupleCount = (long)  ( ((double) count/ totalSampleCount_) 
* 
+                        totalTuples);
+                            
+           
+           int redCount = (int) Math.round(Math.ceil((double) keyTupleCount
+                   / tupleMCount));
+
+           if (log.isDebugEnabled()) 
+           {
+               log.debug("avgM: " + avgM);
+               log.debug("tuple count: " + keyTupleCount);
+               log.debug("count: " + count);
+               log.debug("A reducer can take " + tupleMCount + " tuples and "
+                       + keyTupleCount + " tuples are find for " + 
currentTuple);
+               log.debug("key " + currentTuple + " need " + redCount + " 
reducers");
+           }
+
+           // this is not a skewed key
+           if (redCount == 1) {
+               return new Pair<Tuple, Integer>(null, 1);
+           }
+
+           Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+           int i = 0;
+           try {
+               // set keys
+               for (; i < currentTuple.size() - 2; i++) {
+                   t.set(i, currentTuple.get(i));
+               }
+
+               // set the min index of reducer for this key
+               t.set(i++, currentIndex_);
+               currentIndex_ = (currentIndex_ + redCount) % totalReducers_ - 1;
+               if (currentIndex_ < 0) {
+                   currentIndex_ += totalReducers_;
+               }
+               // set the max index of reducer for this key
+               t.set(i++, currentIndex_);
+           } catch (ExecException e) {
+               throw new RuntimeException("Failed to set value to tuple." + e);
+           }
 
-               // this is not a skewed key
-               if (redCount == 1) {
-                       return new Pair<Tuple, Integer>(null, 1);
-               }
+           currentIndex_ = (currentIndex_ + 1) % totalReducers_;
 
-               Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
-               int i = 0;
-               try {
-                       // set keys
-                       for (; i < currentTuple.size() - 2; i++) {
-                               t.set(i, currentTuple.get(i));
-                       }
+           Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
 
-                       // set the min index of reducer for this key
-                       t.set(i++, currentIndex_);
-                       currentIndex_ = (currentIndex_ + redCount) % 
totalReducers_ - 1;
-                       if (currentIndex_ < 0) {
-                               currentIndex_ += totalReducers_;
-                       }
-                       // set the max index of reducer for this key
-                       t.set(i++, currentIndex_);
-               } catch (ExecException e) {
-                       throw new RuntimeException("Failed to set value to 
tuple." + e);
-               }
-
-               currentIndex_ = (currentIndex_ + 1) % totalReducers_;
-
-               Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
-
-               return p;
+           return p;
        }
 
        // the last field of the tuple is a tuple for memory size and disk size
        private long getMemorySize(Tuple t) {
-               int s = t.size();
-               try {
-                       return (Long) t.get(s - 2);
-               } catch (ExecException e) {
-                       throw new RuntimeException(
-                                       "Unable to retrive the size field from 
tuple.", e);
-               }
+           int s = t.size();
+           try {
+               return (Long) t.get(s - 2);
+           } catch (ExecException e) {
+               throw new RuntimeException(
+                       "Unable to retrive the size field from tuple.", e);
+           }
        }
 
-       // the last field of the tuple is a tuple for memory size and disk size
-       private long getDiskSize(Tuple t) {
-               int s = t.size();
-               try {
-                       return (Long) t.get(s - 1);
-               } catch (ExecException e) {
-                       throw new RuntimeException(
-                                       "Unable to retrive the size field from 
tuple.", e);
-               }
-       }
 
        private boolean hasSameKey(Tuple t1, Tuple t2) {
                // Have to break the tuple down and compare it field to field.

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
 Thu Nov 19 00:58:17 2009
@@ -21,65 +21,152 @@
 import java.util.ArrayList;
 import java.util.Properties;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.PigException;
+
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.Pair;
 
 /**
- * Currently skipInterval is similar to the randomsampleloader. However, if we 
were to use an
- * uniform distribution, we could precompute the intervals and read it from a 
file.
- *
+ * See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler
  */
-//XXX : FIXME - make this work with new load-store redesign
 public class PoissonSampleLoader extends SampleLoader {
        
-       // Base number of samples needed
-       private long baseNumSamples;
+        // marker string for special row with total number or rows. 
+        // this will be value of first column in the special row
+        public static final String NUMROWS_TUPLE_MARKER = 
+            "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
+        
+        //num of rows sampled so far
+        private int numRowsSampled = 0;
+        
+        //average size of tuple in memory, for tuples sampled
+        private long avgTupleMemSz = 0;
+       
+       //current row number 
+       private long rowNum = 0;
        
-       /// Count of the map splits
-       private static final String MAPSPLITS_COUNT = "pig.mapsplits.count";
+       // number of tuples to skip after each sample
+        long skipInterval = -1;
+
+       // bytes in input to skip after every sample. 
+        // divide this by avgTupleMemSize to get skipInterval 
+       private long memToSkipPerSample = 0;
        
-       /// Conversion factor accounts for the various encodings, compression 
etc
-       private static final String CONV_FACTOR = 
"pig.inputfile.conversionfactor";
+       // has the special row with row number information been returned
+       private boolean numRowSplTupleReturned = false;
        
        /// For a given mean and a confidence, a sample rate is obtained from a 
poisson cdf
        private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
        
+       // 17 is not a magic number. It can be obtained by using a poisson 
cumulative distribution function with the mean
+        // set to 10 (emperically, minimum number of samples) and the 
confidence set to 95%
+        private static final int DEFAULT_SAMPLE_RATE = 17;
+        
+        private int sampleRate = DEFAULT_SAMPLE_RATE;
+       
        /// % of memory available for the input data. This is currenty equal to 
the memory available
        /// for the skewed join
        private static final String PERC_MEM_AVAIL = 
"pig.skewedjoin.reduce.memusage";
+
+        private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+               
+        // new Sample tuple
+        private Tuple newSample = null;
+        
+//     private final Log log = LogFactory.getLog(getClass());
        
-       // 17 is not a magic number. It can be obtained by using a poisson 
cumulative distribution function with the mean
-       // set to 10 (emperically, minimum number of samples) and the 
confidence set to 95%
-       private static final int DEFAULT_SAMPLE_RATE = 17;
-       
-       // By default the data is multiplied by 2 to account for the encoding
-       private static final int DEFAULT_CONV_FACTOR = 2;
-       
-       private final Log log = LogFactory.getLog(getClass());
 
        public PoissonSampleLoader(String funcSpec, String ns) {
                super(funcSpec);
                super.setNumSamples(Integer.valueOf(ns)); // will be overridden
        }
-       
-       // n is the number of map tasks
-       @Override
-       public void setNumSamples(int n) {
-               super.setNumSamples(n); // will be overridden
+
+       /* (non-Javadoc)
+        * @see org.apache.pig.LoadFunc#getNext()
+        */
+       public Tuple getNext() throws IOException {
+           if(numRowSplTupleReturned){
+               // row num special row has been returned after all inputs 
+               // were read, nothing more to read 
+               return null;
+           }
+           
+
+           if(skipInterval == -1){
+               //select first tuple as sample and calculate
+               // number of tuples to be skipped 
+               Tuple t = loader.getNext();
+               if(t == null)
+                   return createNumRowTuple(null);
+               long availRedMem = (long) (Runtime.getRuntime().maxMemory() * 
heapPerc);
+               memToSkipPerSample = availRedMem/sampleRate;
+               updateSkipInterval(t);
+                
+               rowNum++;
+                newSample = t;
+           }
+
+           // skip tuples
+           for(long numSkipped  = 0; numSkipped < skipInterval; numSkipped++){
+               if(!skipNext()){
+                   return createNumRowTuple(newSample);
+               }
+               rowNum++;
+           }
+           
+           // skipped enough, get new sample
+           Tuple t = loader.getNext();
+           if(t == null)
+               return createNumRowTuple(newSample);
+           updateSkipInterval(t);
+           rowNum++;
+            Tuple currentSample = newSample;
+            newSample = t;
+            return currentSample;
        }
-       
+
+       /**
+        * Update the average tuple size base on newly sampled tuple t
+        * and recalculate skipInterval
+        * @param t - tuple
+        */
+       private void updateSkipInterval(Tuple t) {
+           avgTupleMemSz = 
+               ((avgTupleMemSz*numRowsSampled) + 
t.getMemorySize())/(numRowsSampled + 1);
+           skipInterval = memToSkipPerSample/avgTupleMemSz;
+           
+            // skipping fewer number of rows the first few times, to reduce 
+            // the probability of first tuples size (if much smaller than 
rest) 
+           // resulting in 
+            // very few samples being sampled. Sampling a little extra is OK
+           if(numRowsSampled < 5)
+               skipInterval = skipInterval/(10-numRowsSampled);
+            ++numRowsSampled;
+
+       }
+
+       /**
+        * @param sample - sample tuple
+        * @return - Tuple appended with special marker string column, num-rows 
column
+        * @throws ExecException
+        */
+       private Tuple createNumRowTuple(Tuple sample) throws ExecException {
+           if(rowNum == 0 || sample == null)
+               return null;
+           TupleFactory factory = TupleFactory.getInstance();
+           Tuple t = factory.newTuple(sample.size() + 2);
+           for(int i=0; i<sample.size(); i++){
+               t.set(i, sample.get(i));
+           }
+           t.set(sample.size(), NUMROWS_TUPLE_MARKER);
+           t.set(sample.size() + 1, rowNum);
+           numRowSplTupleReturned = true;
+           return t;
+       }
+
        /**
         * Computes the number of samples for the loader
         * 
@@ -89,100 +176,28 @@
         */
        @Override
        public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
PigContext pc) throws ExecException {
-               int numSplits, convFactor, sampleRate;
-               Properties pcProps = pc.getProperties();
-               
-               // Set default values for the various parameters
-               try {
-                       numSplits = 
Integer.valueOf(pcProps.getProperty(MAPSPLITS_COUNT));
-               } catch (NumberFormatException e) {
-                       String msg = "Couldn't retrieve the number of maps in 
the job";
-                       throw new ExecException(msg);
-               }
-               
-               try {
-                       convFactor = 
Integer.valueOf(pcProps.getProperty(CONV_FACTOR));
-               } catch (NumberFormatException e) {
-                       convFactor = DEFAULT_CONV_FACTOR;
-               }
-               
-               try {
-                       sampleRate = 
Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
-               } catch (NumberFormatException e) {
-                       sampleRate = DEFAULT_SAMPLE_RATE;
-               }
-               
-               // % of memory available for the records
-               float heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-                if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
-                   try {
-                        heapPerc = 
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
-                    }catch(NumberFormatException e) {
-                       // ignore, use default value
-                    }
-                }
-               
-               // we are only concerned with the first input for skewed join
-               String fname = inputs.get(0).first.getFileName();
-               
-               // calculate the base number of samples
-               try {
-                       float f = (Runtime.getRuntime().maxMemory() * heapPerc) 
/ (float) (FileLocalizer.getSize(fname,pcProps) * convFactor);
-                       baseNumSamples = (long) Math.ceil(1.0 / f);
-               } catch (IOException e) {
-                       int errCode = 2175;
-                       String msg = "Internal error. Could not retrieve file 
size for the sampler.";
-                       throw new ExecException(msg, errCode, PigException.BUG);
-               } catch (ArithmeticException e) {
-                       int errCode = 1105;
-                       String msg = "Heap percentage / Conversion factor 
cannot be set to 0";
-                       throw new ExecException(msg,errCode,PigException.INPUT);
-               }
-               
-               // set the number of samples
-               int n = (int) ((baseNumSamples * sampleRate) / numSplits);
-               
-               // set the minimum number of samples to 1
-               n = (n > 1) ? n : 1;
-               setNumSamples(n);
+           Properties pcProps = pc.getProperties();
+
+           // % of memory available for the records
+           heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
+           if (pcProps.getProperty(PERC_MEM_AVAIL) != null) {
+               try {
+                   heapPerc = 
Float.valueOf(pcProps.getProperty(PERC_MEM_AVAIL));
+               }catch(NumberFormatException e) {
+                   // ignore, use default value
+               }
+           }
+
+           try {
+               sampleRate = Integer.valueOf(pcProps.getProperty(SAMPLE_RATE));
+           } catch (NumberFormatException e) {
+               sampleRate = DEFAULT_SAMPLE_RATE;
+           }
+
        }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#getLoadCaster()
-     */
-    @Override
-    public LoadCaster getLoadCaster() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    /* (non-Javadoc)
-     * @see 
org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, 
org.apache.hadoop.mapreduce.InputSplit)
-     */
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split) {
-        // TODO Auto-generated method stub
-        
-    }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, 
org.apache.hadoop.mapreduce.Job)
-     */
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        // TODO Auto-generated method stub
-        
-    }
 
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, 
org.apache.hadoop.fs.Path)
-     */
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-       
+
 
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
 Thu Nov 19 00:58:17 2009
@@ -18,23 +18,23 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
+import java.util.Random;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
 
 /**
- * A loader that samples the data.  This loader can subsume loader that
- * can handle starting in the middle of a record.  Loaders that can
- * handle this should implement the SamplableLoader interface.
+ * A loader that samples the data.  
+ * It randomly samples tuples from input. The number of tuples to be sampled
+ * has to be set before the first call to getNext().
+ *  see documentation of getNext() call.
  */
-//XXX : FIXME - make this work with new load-store redesign
 public class RandomSampleLoader extends SampleLoader {
  
+    //array to store the sample tuples
+    Tuple [] samples = null;
+    //index into samples array to the next sample to be returned 
+    protected int nextSampleIdx= 0;
+    
     /**
      * Construct with a class of loader to use.
      * @param funcSpec func spec of the loader to use.
@@ -49,61 +49,67 @@
         // set the number of samples
         super.setNumSamples(Integer.valueOf(ns));
     }
-    
-    
-    @Override
-    public void setNumSamples(int n) {
-       // Setting it to 100 as default for order by
-       super.setNumSamples(100);
-    }
 
     /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#getInputFormat()
+     * @see org.apache.pig.LoadFunc#getNext()
+     * Allocate a buffer for numSamples elements, populate it with the 
+     * first numSamples tuples, and continue scanning rest of the input.
+     * For every ith next() call, we generate a random number r s.t. 0<=r<i,
+     * and if r<numSamples we insert the new tuple into our buffer at position 
r.
+     * This gives us a random sample of the tuples in the partition.
      */
     @Override
-    public InputFormat getInputFormat() throws IOException {
-        // TODO Auto-generated method stub
-        return loader.getInputFormat();
-    }
-
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#getLoadCaster()
-     */
-    @Override
-    public LoadCaster getLoadCaster() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    /* (non-Javadoc)
-     * @see 
org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, 
org.apache.hadoop.mapreduce.InputSplit)
-     */
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split) {
-        // TODO Auto-generated method stub
+    public Tuple getNext() throws IOException {
+
+        if(samples != null){
+            return getSample();
+        }
+        //else collect samples
+        samples = new Tuple[numSamples];
         
-    }
-
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, 
org.apache.hadoop.mapreduce.Job)
-     */
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        // TODO Auto-generated method stub
+        // populate the samples array with first numSamples tuples
+        Tuple t = null;
+        for(int i=0; i<numSamples; i++){
+            t = loader.getNext();
+            if(t == null)
+                break;
+            samples[i] = t;
+        }
         
+        // rowNum that starts from 1
+        int rowNum = numSamples+1;
+        Random randGen = new Random();
+
+        if(t != null){ // did not exhaust all tuples
+            while(true){
+                // collect samples until input is exhausted
+                int rand = randGen.nextInt(rowNum);
+                if(rand < numSamples){
+                    // pick this as sample
+                    Tuple sampleTuple = loader.getNext();
+                    if(sampleTuple == null)
+                        break;
+                    samples[rand] = sampleTuple;
+                }else {
+                    //skip tuple
+                    if(!skipNext())
+                        break;
+                }
+                rowNum++;
+            }
+        }        
+        
+        return getSample();
+    } 
+    
+    private Tuple getSample() {
+        if(nextSampleIdx < samples.length){
+            return samples[nextSampleIdx++];
+        }
+        else{
+            return null;
+        }
     }
 
-
-    /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#relativeToAbsolutePath(java.lang.String, 
org.apache.hadoop.fs.Path)
-     */
-    @Override
-    public String relativeToAbsolutePath(String location, Path curDir)
-            throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
  
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
 Thu Nov 19 00:58:17 2009
@@ -19,36 +19,32 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Map;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.SamplableLoader;
-import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.Pair;
 
 /**
  * Abstract class that specifies the interface for sample loaders
  *
  */
-//XXX : FIXME - make this work with new load-store redesign
 public abstract class SampleLoader implements LoadFunc {
 
-       protected int numSamples;
-       protected long skipInterval;
+    // number of samples to be sampled
+    protected int numSamples;
+    
     protected LoadFunc loader;
-       private TupleFactory factory;
-       private boolean initialized = false;
-
+    
+    // RecordReader used by the underlying loader
+    private RecordReader<?, ?> recordReader= null;
     
     public SampleLoader(String funcSpec) {
        loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
@@ -66,22 +62,49 @@
      * @see org.apache.pig.LoadFunc#getInputFormat()
      */
     @Override
-    public InputFormat getInputFormat() throws IOException {
+    public InputFormat<?,?> getInputFormat() throws IOException {
         return loader.getInputFormat();
+    } 
+
+    public boolean skipNext() throws IOException {
+        try {
+            return recordReader.nextKeyValue();
+        } catch (InterruptedException e) {
+            throw new IOException("Error getting input",e);
+        }
+    }
+    
+    public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
PigContext pc)
+    throws ExecException {
     }
+    
+    @Override
+    public LoadCaster getLoadCaster() throws IOException {
+        return loader.getLoadCaster();
+    }
+    
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir)
+            throws IOException {
+        return loader.relativeToAbsolutePath(location, curDir);
+    }
+    
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, 
org.apache.hadoop.mapreduce.InputSplit)
+     */
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) throws 
IOException {
+        loader.prepareToRead(reader, split);
+        this.recordReader = reader;
+    }
+    
 
     /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#getNext()
-        */
-       public Tuple getNext() throws IOException {
-          // estimate how many tuples there are in the map
-          // based on the 
-           return null;   
-       }
-
-       public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
PigContext pc) throws ExecException {
-               // TODO Auto-generated method stub
-               
-       }
+     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, 
org.apache.hadoop.mapreduce.Job)
+     */
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        loader.setLocation(location, job);
+    }
 
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java?rev=882021&r1=882020&r2=882021&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/TupleSize.java
 Thu Nov 19 00:58:17 2009
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.builtin;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-/**
- * UDF to get memory and disk size of a tuple.
- * It is used by skewed join.
- * 
- */
-
-public class TupleSize extends EvalFunc<Tuple>{          
-
-       private TupleFactory factory;
-       
-    public TupleSize() {
-       factory = TupleFactory.getInstance();
-    }      
-
-    /**
-     * Get memory size and disk size of input tuple
-     */    
-    public Tuple exec(Tuple in) throws IOException {
-       if (in == null) {
-               return null;
-       }
-       
-       Tuple t = factory.newTuple(2);
-       t.set(0, in.getMemorySize());
-       t.set(1, in.get(in.size()-1));
-               
-       return t;
-    }
-    
-    public Type getReturnType(){
-        return Tuple.class;
-    }       
-}


Reply via email to