Author: pradeepkth
Date: Fri Apr 10 01:09:55 2009
New Revision: 763847

URL: http://svn.apache.org/viewvc?rev=763847&view=rev
Log:
PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs 
"FileSystem closed" error on large input

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=763847&r1=763846&r2=763847&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Apr 10 01:09:55 2009
@@ -28,6 +28,9 @@
 
 BUG FIXES
 
+PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs
+"FileSystem closed" error on large input (pradeepkth)
+
 PIG-693: Parameter to UDF which is an alias returned in another UDF in nested
 foreach causes incorrect results (thejas via sms)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=763847&r1=763846&r2=763847&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Apr 10 01:09:55 2009
@@ -36,6 +36,7 @@
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -149,6 +150,8 @@
     
     private CompilationMessageCollector messageCollector = null;
     
+    public static String USER_COMPARATOR_MARKER = "user.comparator.func:";
+    
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
         this(plan,null);
     }
@@ -1312,10 +1315,55 @@
         mro.reducePlan.add(nfe2);
         mro.reducePlan.connect(pkg, nfe2);
         
+        // Let's connect the output from the foreach containing
+        // number of quantiles and the sorted bag of samples to
+        // another foreach with the FindQuantiles udf. The input
+        // to the FindQuantiles udf is a project(*) which takes the 
+        // foreach input and gives it to the udf
+        PhysicalPlan ep4 = new PhysicalPlan();
+        POProject prjStar4 = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar4.setResultType(DataType.TUPLE);
+        prjStar4.setStar(true);
+        ep4.add(prjStar4);
+        
+        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+        ufInps.add(prjStar4);
+        // Turn the asc/desc array into an array of strings so that we can 
pass it
+        // to the FindQuantiles function.
+        List<Boolean> ascCols = inpSort.getMAscCols();
+        String[] ascs = new String[ascCols.size()];
+        for (int i = 0; i < ascCols.size(); i++) ascs[i] = 
ascCols.get(i).toString();
+        // check if user defined comparator is used in the sort, if so
+        // prepend the name of the comparator as the first fields in the
+        // constructor args array to the FindQuantiles udf
+        String[] ctorArgs = ascs;
+        if(sort.isUDFComparatorUsed) {
+            String userComparatorFuncSpec = 
sort.getMSortFunc().getFuncSpec().toString();
+            ctorArgs = new String[ascs.length + 1];
+            ctorArgs[0] = USER_COMPARATOR_MARKER + userComparatorFuncSpec;
+            for(int j = 0; j < ascs.length; j++) {
+                ctorArgs[j+1] = ascs[j];
+            }
+        }
+        
+        POUserFunc uf = new POUserFunc(new 
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, 
+            new FuncSpec(FindQuantiles.class.getName(), ctorArgs));
+        ep4.add(uf);
+        ep4.connect(prjStar4, uf);
+        
+        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
+        ep4s.add(ep4);
+        List<Boolean> flattened3 = new ArrayList<Boolean>();
+        flattened3.add(false);
+        POForEach nfe3 = new POForEach(new 
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
+        
+        mro.reducePlan.add(nfe3);
+        mro.reducePlan.connect(nfe2, nfe3);
+        
         POStore str = getStore();
         str.setSFile(quantFile);
         mro.reducePlan.add(str);
-        mro.reducePlan.connect(nfe2, str);
+        mro.reducePlan.connect(nfe3, str);
         
         mro.setReduceDone(true);
         mro.requestedParallelism = 1;

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=763847&r1=763846&r2=763847&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 Fri Apr 10 01:09:55 2009
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -29,12 +30,14 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullableBytesWritable;
@@ -68,6 +71,7 @@
         return gen.getNext();
     }
 
+    @SuppressWarnings("unchecked")
     public void configure(JobConf job) {
         this.job = job;
         String quantilesFile = job.get("pig.quantilesFile", "");
@@ -78,120 +82,41 @@
         try{
             InputStream is = 
FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
             BinStorage loader = new BinStorage();
-            ArrayList<PigNullableWritable> quantilesList = new 
ArrayList<PigNullableWritable>();
+            DataBag quantilesList;
             loader.bindTo(quantilesFile, new 
BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
             Tuple t = loader.getNext();
             if(t==null) throw new RuntimeException("Empty samples file");
             // the Quantiles file has a tuple as under:
             // (numQuantiles, bag of samples) 
             // numQuantiles here is the reduce parallelism
-            numQuantiles = (Integer) t.get(0);
-            samples = (DataBag) t.get(1);
-            long numSamples = samples.size();
-            long toSkip = numSamples / numQuantiles;
-            if(toSkip == 0) {
-                // numSamples is < numQuantiles;
-                // set numQuantiles to numSamples
-                numQuantiles = (int)numSamples;
-                toSkip = 1;
-            }
-            
-            long ind=0, j=-1, nextQuantile = toSkip-1;
-            for (Tuple it : samples) {
-                if (ind==nextQuantile){
-                    ++j;
-                    quantilesList.add(getPigNullableWritable(it));
-                    nextQuantile+=toSkip;
-                    if(j==numQuantiles-1)
-                        break;
-                }
-                ind++;
-                //TODO how do we report progress?
-                //if (i % 1000 == 0) progress();
-                // Currently there is no way to report progress since 
-                // in configure() we cannot get a handle to the reporter
-                // (even PhysicalOperator.getReporter() does not work! It is
-                // set to null.) Hopefully the work done here wll be < 10 
minutes
-                // since we are dealing with 100* num_mapper samples. When
-                // RandomSampleLoader becomes an operator or UDF instead of a
-                // loader hopefully we can intelligently decide on the number
-                // of samples (instead of the static 100) and then not being
-                // able to report progress may not be a big issue.
-            }
+            Map<String, Object> quantileMap = (Map<String, Object>) t.get(0);
+            quantilesList = (DataBag) 
quantileMap.get(FindQuantiles.QUANTILES_LIST);
+            Map<Tuple, Tuple> weightedPartsData = (Map<Tuple, Tuple>) 
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             convertToArray(quantilesList);
-            long i=-1;
-            Map<PigNullableWritable,CountingMap<Integer>> contribs = new 
HashMap<PigNullableWritable, CountingMap<Integer>>();
-            for (Tuple it : samples){
-                ++i;
-                PigNullableWritable sample = getPigNullableWritable(it);
-                int partInd = new Long(i/toSkip).intValue(); // which partition
-                if(partInd==numQuantiles) break;
-                // the quantiles array has the element from the sample which 
is the
-                // last element for a given partition. For example: if 
numQunatiles 
-                // is 5 and number of samples is 100, then toSkip = 20 
-                // quantiles[0] = sample[19] // the 20th element
-                // quantiles[1] = sample[39] // the 40th element
-                // and so on. For any element in the sample between 0 and 19, 
partInd
-                // will be 0. We want to check if a sample element which is
-                // present between 0 and 19 and is also the 19th (quantiles[0] 
element).
-                // This would mean that element might spread over the 0th and 
1st 
-                // partition. We are looking for contributions to a partition
-                // from such elements. 
-                
-                // First We only check for sample elements in partitions other 
than the last one
-                // < numQunatiles -1 (partInd is 0 indexed). 
-                if(partInd<numQuantiles-1 && 
areEqual(sample,quantiles[partInd])){
-                    if(!contribs.containsKey(sample)){
-                        CountingMap<Integer> cm = new CountingMap<Integer>();
-                        cm.put(partInd, 1);
-                        contribs.put(sample, cm);
-                    }
-                    else
-                        contribs.get(sample).put(partInd, 1);
-                }
-                else{ 
-                    // we are either in the last partition (last quantile)
-                    // OR the sample element we are currently processing is not
-                    // the same as the element in the quantile array for this 
partition
-                    // if we haven't seen this sample item earlier, this is 
not an
-                    // element which crosses partitions - so ignore
-                    if(!contribs.containsKey(sample))
-                        continue;
-                    else
-                        // we have seen this sample before (in a previous 
partInd), 
-                        // add to the contribution associated with this sample 
- if we had 
-                        // not seen this sample in a previous partInd, then we 
have not
-                        // had this in the contribs map! (because of the if 
above).This 
-                        // sample can either go to the previous partInd or 
this partInd 
-                        // in the final sort reduce stage. That is where the 
amount of 
-                        // contribution to each partInd will matter and 
influence the choice.
-                        contribs.get(sample).put(partInd, 1);
-                }
-            }
-            for(Entry<PigNullableWritable, CountingMap<Integer>> ent : 
contribs.entrySet()){
-                PigNullableWritable key = ent.getKey(); // sample item which 
repeats
-                
-                // this map will have the contributions of the sample item to 
the different partitions
-                CountingMap<Integer> value = ent.getValue(); 
-                
-                long total = value.getTotalCount();
-                float[] probVec = new float[numQuantiles];
-                // for each partition that this sample item is present in,
-                // compute the fraction of the total occurences for that
-                // partition - this will be the probability with which we
-                // will pick this partition in the final sort reduce job
-                // for this sample item
-                for (Entry<Integer,Integer> valEnt : value.entrySet()) {
-                    probVec[valEnt.getKey()] = (float)valEnt.getValue()/total;
-                }
-//                weightedParts.put(key, new 
DiscreteProbabilitySampleGenerator(11317,probVec));
-                weightedParts.put(key, new 
DiscreteProbabilitySampleGenerator(probVec));
+            for(Entry<Tuple, Tuple> ent : weightedPartsData.entrySet()){
+                Tuple key = ent.getKey(); // sample item which repeats
+                float[] probVec = getProbVec(ent.getValue());
+                weightedParts.put(getPigNullableWritable(key), 
+                        new DiscreteProbabilitySampleGenerator(probVec));
             }
         }catch (Exception e){
             throw new RuntimeException(e);
         }
     }
 
+    /**
+     * @param value
+     * @return
+     * @throws ExecException 
+     */
+    private float[] getProbVec(Tuple values) throws ExecException {
+        float[] probVec = new float[values.size()];
+        for(int i = 0; i < values.size(); i++) {
+            probVec[i] = (Float)values.get(i);
+        }
+        return probVec;
+    }
+
     private PigNullableWritable getPigNullableWritable(Tuple t) {
         try {
             // user comparators work with tuples - so if user comparator
@@ -219,24 +144,38 @@
     }
 
     private void convertToArray(
-            ArrayList<PigNullableWritable> q) {
+            DataBag quantilesListAsBag) {
+        ArrayList<PigNullableWritable> quantilesList = 
getList(quantilesListAsBag);
         if ("true".equals(job.get("pig.usercomparator")) ||
-                q.get(0).getClass().equals(NullableTuple.class)) {
-            quantiles = q.toArray(new NullableTuple[0]);
-        } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) {
-            quantiles = q.toArray(new NullableBytesWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) {
-            quantiles = q.toArray(new NullableDoubleWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) {
-            quantiles = q.toArray(new NullableFloatWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableIntWritable.class)) {
-            quantiles = q.toArray(new NullableIntWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableLongWritable.class)) {
-            quantiles = q.toArray(new NullableLongWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableText.class)) {
-            quantiles = q.toArray(new NullableText[0]);
+                quantilesList.get(0).getClass().equals(NullableTuple.class)) {
+            quantiles = quantilesList.toArray(new NullableTuple[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableBytesWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableBytesWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableDoubleWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableDoubleWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableFloatWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableFloatWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableIntWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableIntWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableLongWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableLongWritable[0]);
+        } else if (quantilesList.get(0).getClass().equals(NullableText.class)) 
{
+            quantiles = quantilesList.toArray(new NullableText[0]);
         } else {
             throw new RuntimeException("Unexpected class in " + 
this.getClass().getSimpleName());
         }
     }
+
+    /**
+     * @param quantilesListAsBag
+     * @return
+     */
+    private ArrayList<PigNullableWritable> getList(DataBag quantilesListAsBag) 
{
+        
+        ArrayList<PigNullableWritable> list = new 
ArrayList<PigNullableWritable>();
+        for (Tuple tuple : quantilesListAsBag) {
+            list.add(getPigNullableWritable(tuple));
+        }
+        return list;
+    }
 }

Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=763847&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Fri Apr 
10 01:09:55 2009
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.CountingMap;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+
+public class FindQuantiles extends EvalFunc<Map<Object, Object>>{
+    // keys for the weightedparts Map
+    public static final String QUANTILES_LIST = "quantiles.list";
+    public static final String WEIGHTED_PARTS = "weighted.parts";
+
+    BagFactory mBagFactory = BagFactory.getInstance();
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    boolean[] mAsc;
+    enum State { ALL_ASC, ALL_DESC, MIXED };
+    State mState;
+    
+    private class SortComparator implements Comparator<Tuple> {
+        @SuppressWarnings("unchecked")
+        public int compare(Tuple t1, Tuple t2) {
+            switch (mState) {
+            case ALL_ASC:
+                return t1.compareTo(t2);
+
+            case ALL_DESC:
+                return t2.compareTo(t1);
+
+            case MIXED:
+                // Have to break the tuple down and compare it field to field.
+                int sz1 = t1.size();
+                int sz2 = t2.size();
+                if (sz2 < sz1) {
+                    return 1;
+                } else if (sz2 > sz1) {
+                    return -1;
+                } else {
+                    for (int i = 0; i < sz1; i++) {
+                        try {
+                            int c = DataType.compare(t1.get(i), t2.get(i));
+                            if (c != 0) {
+                                if (!mAsc[i]) c *= -1;
+                                return c;
+                            }
+                        } catch (ExecException e) {
+                            throw new RuntimeException("Unable to compare 
tuples", e);
+                        }
+                    }
+                    return 0;
+                }
+            }
+            return -1; // keep the compiler happy
+        }
+    }
+
+    private Comparator<Tuple> mComparator = new SortComparator();
+    private FuncSpec mUserComparisonFuncSpec;
+    private ComparisonFunc mUserComparisonFunc;
+    
+    
+    @SuppressWarnings("unchecked")
+    private void instantiateFunc() {
+        if(mUserComparisonFunc != null) {
+            this.mUserComparisonFunc = (ComparisonFunc) 
PigContext.instantiateFuncFromSpec(this.mUserComparisonFuncSpec);
+            this.mUserComparisonFunc.setReporter(reporter);
+            this.mComparator = mUserComparisonFunc;
+        }
+    }
+    
+    // We need to instantiate any user defined comparison function 
+    // on the backend when the FindQuantiles udf is deserialized
+    private void readObject(ObjectInputStream is) throws IOException, 
ClassNotFoundException{
+        is.defaultReadObject();
+        instantiateFunc();
+    }
+    
+
+    public FindQuantiles() {
+        mState = State.ALL_ASC;
+    }
+
+    public FindQuantiles(String[] args) {
+        int startIndex = 0;
+        int ascFlagsLength = args.length;
+        // the first argument may be the information
+        // about user defined comparison function if one
+        // was specified
+        if(args[0].startsWith(MRCompiler.USER_COMPARATOR_MARKER)) {
+            mUserComparisonFuncSpec = new FuncSpec(
+                    
args[0].substring(MRCompiler.USER_COMPARATOR_MARKER.length()));
+            // skip the first argument now that we used it
+            startIndex++;
+            ascFlagsLength--;
+        }
+        
+        mAsc = new boolean[ascFlagsLength];
+        boolean sawAsc = false;
+        boolean sawDesc = false;
+        for (int i = startIndex; i < ascFlagsLength; i++) {
+            mAsc[i] = Boolean.parseBoolean(args[i]);
+            if (mAsc[i]) sawAsc = true;
+            else sawDesc = true;
+        }
+        if (sawAsc && sawDesc) mState = State.MIXED;
+        else if (sawDesc) mState = State.ALL_DESC;
+        else mState = State.ALL_ASC; // In cast they gave us no args this
+                                     // defaults to all ascending.
+    }
+
+    /**
+     * first field in the input tuple is the number of quantiles to generate
+     * second field is the *sorted* bag of samples
+     */
+    
+    @Override
+    public Map<Object, Object> exec(Tuple in) throws IOException {
+        Map<Object, Object> output = new HashMap<Object, Object>();
+        if(in==null || in.size()==0)
+            return null;
+        Integer numQuantiles = null;
+        DataBag samples = null;
+        ArrayList<Tuple> quantilesList = new ArrayList<Tuple>();
+        Map<Tuple,Tuple> weightedParts = new HashMap<Tuple, Tuple>();
+        // the sample file has a tuple as under:
+        // (numQuantiles, bag of samples) 
+        // numQuantiles here is the reduce parallelism
+        try{
+            numQuantiles = (Integer)in.get(0);
+            samples = (DataBag)in.get(1);
+            
+            long numSamples = samples.size();
+            long toSkip = numSamples / numQuantiles;
+            if(toSkip == 0) {
+                // numSamples is < numQuantiles;
+                // set numQuantiles to numSamples
+                numQuantiles = (int)numSamples;
+                toSkip = 1;
+            }
+            
+            long ind=0, j=-1, nextQuantile = toSkip-1;
+            for (Tuple it : samples) {
+                if (ind==nextQuantile){
+                    ++j;
+                    quantilesList.add(it);
+                    nextQuantile+=toSkip;
+                    if(j==numQuantiles-1)
+                        break;
+                }
+                ind++;
+                if (ind % 1000 == 0) progress();
+            }
+            long i=-1;
+            Map<Tuple,CountingMap<Integer>> contribs = new HashMap<Tuple, 
CountingMap<Integer>>();
+            for (Tuple it : samples){
+                ++i;
+                if (i % 1000 == 0) progress();
+                int partInd = new Long(i/toSkip).intValue(); // which partition
+                if(partInd==numQuantiles) break;
+                // the quantiles array has the element from the sample which 
is the
+                // last element for a given partition. For example: if 
numQuantiles 
+                // is 5 and number of samples is 100, then toSkip = 20 
+                // quantiles[0] = sample[19] // the 20th element
+                // quantiles[1] = sample[39] // the 40th element
+                // and so on. For any element in the sample between 0 and 19, 
partInd
+                // will be 0. We want to check if a sample element which is
+                // present between 0 and 19 is also the 19th (quantiles[0] 
element).
+                // This would mean that element might spread over the 0th and 
1st 
+                // partition. We are looking for contributions to a partition
+                // from such elements. 
+                
+                // First We only check for sample elements in partitions other 
than the last one
+                // < numQuantiles -1 (partInd is 0 indexed). 
+                if(partInd<numQuantiles-1 && 
areEqual(it,quantilesList.get(partInd))){
+                    if(!contribs.containsKey(it)){
+                        CountingMap<Integer> cm = new CountingMap<Integer>();
+                        cm.put(partInd, 1);
+                        contribs.put(it, cm);
+                    }
+                    else
+                        contribs.get(it).put(partInd, 1);
+                }
+                else{ 
+                    // we are either in the last partition (last quantile)
+                    // OR the sample element we are currently processing is not
+                    // the same as the element in the quantile array for this 
partition
+                    // if we haven't seen this sample item earlier, this is 
not an
+                    // element which crosses partitions - so ignore
+                    if(!contribs.containsKey(it))
+                        continue;
+                    else
+                        // we have seen this sample before (in a previous 
partInd), 
+                        // add to the contribution associated with this sample 
- if we had 
+                        // not seen this sample in a previous partInd, then we 
would have not
+                        // had this in the contribs map! (because of the if 
above).This 
+                        // "key" (represented by the sample item) can either 
go to the 
+                        // previous partInd or this partInd in the final sort 
reduce stage. 
+                        // That is where the amount of contribution to each 
partInd will
+                        // matter and influence the choice.
+                        contribs.get(it).put(partInd, 1);
+                }
+            }
+            int k = 0;
+            for(Entry<Tuple, CountingMap<Integer>> ent : contribs.entrySet()){
+                if (k % 1000 == 0) progress();
+                Tuple key = ent.getKey(); // sample item which repeats
+                
+                // this map will have the contributions of the sample item to 
the different partitions
+                CountingMap<Integer> value = ent.getValue(); 
+                
+                long total = value.getTotalCount();
+                Tuple probVec =  
mTupleFactory.newTuple(numQuantiles.intValue());
+                // initialize all contribution fractions for different
+                // partitions to 0.0
+                for (int l = 0; l < numQuantiles; l++) {
+                    probVec.set(l, new Float(0.0));
+                }
+                // for each partition that this sample item is present in,
+                // compute the fraction of the total occurences for that
+                // partition - this will be the probability with which we
+                // will pick this partition in the final sort reduce job
+                // for this sample item
+                for (Entry<Integer,Integer> valEnt : value.entrySet()) {
+                    probVec.set(valEnt.getKey(), 
(float)valEnt.getValue()/total);
+                }
+                weightedParts.put(key, probVec);
+            }
+            output.put(QUANTILES_LIST, 
mBagFactory.newDefaultBag(quantilesList));
+            output.put(WEIGHTED_PARTS, weightedParts);
+            return output;
+        }catch (Exception e){
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
+
+    private boolean areEqual(Tuple it, Tuple tuple) {
+        return mComparator.compare(it, tuple)==0;
+    }
+}


Reply via email to