Author: pradeepkth Date: Fri Apr 10 01:09:55 2009 New Revision: 763847 URL: http://svn.apache.org/viewvc?rev=763847&view=rev Log: PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs "FileSystem closed" error on large input
Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=763847&r1=763846&r2=763847&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Fri Apr 10 01:09:55 2009 @@ -28,6 +28,9 @@ BUG FIXES +PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs +"FileSystem closed" error on large input (pradeepkth) + PIG-693: Parameter to UDF which is an alias returned in another UDF in nested foreach causes incorrect results (thejas via sms) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=763847&r1=763846&r2=763847&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Apr 10 01:09:55 2009 @@ -36,6 +36,7 @@ import org.apache.pig.builtin.BinStorage; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.RandomSampleLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; @@ -149,6 +150,8 @@ private CompilationMessageCollector messageCollector = null; + public static String USER_COMPARATOR_MARKER = "user.comparator.func:"; + public MRCompiler(PhysicalPlan plan) throws MRCompilerException { this(plan,null); } @@ -1312,10 +1315,55 @@ mro.reducePlan.add(nfe2); mro.reducePlan.connect(pkg, nfe2); + // Let's connect the output from the foreach containing + // number of quantiles and the sorted bag of samples to + // another foreach with the FindQuantiles udf. The input + // to the FindQuantiles udf is a project(*) which takes the + // foreach input and gives it to the udf + PhysicalPlan ep4 = new PhysicalPlan(); + POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); + prjStar4.setResultType(DataType.TUPLE); + prjStar4.setStar(true); + ep4.add(prjStar4); + + List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>(); + ufInps.add(prjStar4); + // Turn the asc/desc array into an array of strings so that we can pass it + // to the FindQuantiles function. + List<Boolean> ascCols = inpSort.getMAscCols(); + String[] ascs = new String[ascCols.size()]; + for (int i = 0; i < ascCols.size(); i++) ascs[i] = ascCols.get(i).toString(); + // check if user defined comparator is used in the sort, if so + // prepend the name of the comparator as the first fields in the + // constructor args array to the FindQuantiles udf + String[] ctorArgs = ascs; + if(sort.isUDFComparatorUsed) { + String userComparatorFuncSpec = sort.getMSortFunc().getFuncSpec().toString(); + ctorArgs = new String[ascs.length + 1]; + ctorArgs[0] = USER_COMPARATOR_MARKER + userComparatorFuncSpec; + for(int j = 0; j < ascs.length; j++) { + ctorArgs[j+1] = ascs[j]; + } + } + + POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, + new FuncSpec(FindQuantiles.class.getName(), ctorArgs)); + ep4.add(uf); + ep4.connect(prjStar4, uf); + + List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>(); + ep4s.add(ep4); + List<Boolean> flattened3 = new ArrayList<Boolean>(); + flattened3.add(false); + POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3); + + mro.reducePlan.add(nfe3); + mro.reducePlan.connect(nfe2, nfe3); + POStore str = getStore(); str.setSFile(quantFile); mro.reducePlan.add(str); - mro.reducePlan.connect(nfe2, str); + mro.reducePlan.connect(nfe3, str); mro.setReduceDone(true); mro.requestedParallelism = 1; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=763847&r1=763846&r2=763847&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Apr 10 01:09:55 2009 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -29,12 +30,14 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.builtin.BinStorage; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.NullableBytesWritable; @@ -68,6 +71,7 @@ return gen.getNext(); } + @SuppressWarnings("unchecked") public void configure(JobConf job) { this.job = job; String quantilesFile = job.get("pig.quantilesFile", ""); @@ -78,120 +82,41 @@ try{ InputStream is = FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job)); BinStorage loader = new BinStorage(); - ArrayList<PigNullableWritable> quantilesList = new ArrayList<PigNullableWritable>(); + DataBag quantilesList; loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); Tuple t = loader.getNext(); if(t==null) throw new RuntimeException("Empty samples file"); // the Quantiles file has a tuple as under: // (numQuantiles, bag of samples) // numQuantiles here is the reduce parallelism - numQuantiles = (Integer) t.get(0); - samples = (DataBag) t.get(1); - long numSamples = samples.size(); - long toSkip = numSamples / numQuantiles; - if(toSkip == 0) { - // numSamples is < numQuantiles; - // set numQuantiles to numSamples - numQuantiles = (int)numSamples; - toSkip = 1; - } - - long ind=0, j=-1, nextQuantile = toSkip-1; - for (Tuple it : samples) { - if (ind==nextQuantile){ - ++j; - quantilesList.add(getPigNullableWritable(it)); - nextQuantile+=toSkip; - if(j==numQuantiles-1) - break; - } - ind++; - //TODO how do we report progress? - //if (i % 1000 == 0) progress(); - // Currently there is no way to report progress since - // in configure() we cannot get a handle to the reporter - // (even PhysicalOperator.getReporter() does not work! It is - // set to null.) Hopefully the work done here wll be < 10 minutes - // since we are dealing with 100* num_mapper samples. When - // RandomSampleLoader becomes an operator or UDF instead of a - // loader hopefully we can intelligently decide on the number - // of samples (instead of the static 100) and then not being - // able to report progress may not be a big issue. - } + Map<String, Object> quantileMap = (Map<String, Object>) t.get(0); + quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST); + Map<Tuple, Tuple> weightedPartsData = (Map<Tuple, Tuple>) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); convertToArray(quantilesList); - long i=-1; - Map<PigNullableWritable,CountingMap<Integer>> contribs = new HashMap<PigNullableWritable, CountingMap<Integer>>(); - for (Tuple it : samples){ - ++i; - PigNullableWritable sample = getPigNullableWritable(it); - int partInd = new Long(i/toSkip).intValue(); // which partition - if(partInd==numQuantiles) break; - // the quantiles array has the element from the sample which is the - // last element for a given partition. For example: if numQunatiles - // is 5 and number of samples is 100, then toSkip = 20 - // quantiles[0] = sample[19] // the 20th element - // quantiles[1] = sample[39] // the 40th element - // and so on. For any element in the sample between 0 and 19, partInd - // will be 0. We want to check if a sample element which is - // present between 0 and 19 and is also the 19th (quantiles[0] element). - // This would mean that element might spread over the 0th and 1st - // partition. We are looking for contributions to a partition - // from such elements. - - // First We only check for sample elements in partitions other than the last one - // < numQunatiles -1 (partInd is 0 indexed). - if(partInd<numQuantiles-1 && areEqual(sample,quantiles[partInd])){ - if(!contribs.containsKey(sample)){ - CountingMap<Integer> cm = new CountingMap<Integer>(); - cm.put(partInd, 1); - contribs.put(sample, cm); - } - else - contribs.get(sample).put(partInd, 1); - } - else{ - // we are either in the last partition (last quantile) - // OR the sample element we are currently processing is not - // the same as the element in the quantile array for this partition - // if we haven't seen this sample item earlier, this is not an - // element which crosses partitions - so ignore - if(!contribs.containsKey(sample)) - continue; - else - // we have seen this sample before (in a previous partInd), - // add to the contribution associated with this sample - if we had - // not seen this sample in a previous partInd, then we have not - // had this in the contribs map! (because of the if above).This - // sample can either go to the previous partInd or this partInd - // in the final sort reduce stage. That is where the amount of - // contribution to each partInd will matter and influence the choice. - contribs.get(sample).put(partInd, 1); - } - } - for(Entry<PigNullableWritable, CountingMap<Integer>> ent : contribs.entrySet()){ - PigNullableWritable key = ent.getKey(); // sample item which repeats - - // this map will have the contributions of the sample item to the different partitions - CountingMap<Integer> value = ent.getValue(); - - long total = value.getTotalCount(); - float[] probVec = new float[numQuantiles]; - // for each partition that this sample item is present in, - // compute the fraction of the total occurences for that - // partition - this will be the probability with which we - // will pick this partition in the final sort reduce job - // for this sample item - for (Entry<Integer,Integer> valEnt : value.entrySet()) { - probVec[valEnt.getKey()] = (float)valEnt.getValue()/total; - } -// weightedParts.put(key, new DiscreteProbabilitySampleGenerator(11317,probVec)); - weightedParts.put(key, new DiscreteProbabilitySampleGenerator(probVec)); + for(Entry<Tuple, Tuple> ent : weightedPartsData.entrySet()){ + Tuple key = ent.getKey(); // sample item which repeats + float[] probVec = getProbVec(ent.getValue()); + weightedParts.put(getPigNullableWritable(key), + new DiscreteProbabilitySampleGenerator(probVec)); } }catch (Exception e){ throw new RuntimeException(e); } } + /** + * @param value + * @return + * @throws ExecException + */ + private float[] getProbVec(Tuple values) throws ExecException { + float[] probVec = new float[values.size()]; + for(int i = 0; i < values.size(); i++) { + probVec[i] = (Float)values.get(i); + } + return probVec; + } + private PigNullableWritable getPigNullableWritable(Tuple t) { try { // user comparators work with tuples - so if user comparator @@ -219,24 +144,38 @@ } private void convertToArray( - ArrayList<PigNullableWritable> q) { + DataBag quantilesListAsBag) { + ArrayList<PigNullableWritable> quantilesList = getList(quantilesListAsBag); if ("true".equals(job.get("pig.usercomparator")) || - q.get(0).getClass().equals(NullableTuple.class)) { - quantiles = q.toArray(new NullableTuple[0]); - } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) { - quantiles = q.toArray(new NullableBytesWritable[0]); - } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) { - quantiles = q.toArray(new NullableDoubleWritable[0]); - } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) { - quantiles = q.toArray(new NullableFloatWritable[0]); - } else if (q.get(0).getClass().equals(NullableIntWritable.class)) { - quantiles = q.toArray(new NullableIntWritable[0]); - } else if (q.get(0).getClass().equals(NullableLongWritable.class)) { - quantiles = q.toArray(new NullableLongWritable[0]); - } else if (q.get(0).getClass().equals(NullableText.class)) { - quantiles = q.toArray(new NullableText[0]); + quantilesList.get(0).getClass().equals(NullableTuple.class)) { + quantiles = quantilesList.toArray(new NullableTuple[0]); + } else if (quantilesList.get(0).getClass().equals(NullableBytesWritable.class)) { + quantiles = quantilesList.toArray(new NullableBytesWritable[0]); + } else if (quantilesList.get(0).getClass().equals(NullableDoubleWritable.class)) { + quantiles = quantilesList.toArray(new NullableDoubleWritable[0]); + } else if (quantilesList.get(0).getClass().equals(NullableFloatWritable.class)) { + quantiles = quantilesList.toArray(new NullableFloatWritable[0]); + } else if (quantilesList.get(0).getClass().equals(NullableIntWritable.class)) { + quantiles = quantilesList.toArray(new NullableIntWritable[0]); + } else if (quantilesList.get(0).getClass().equals(NullableLongWritable.class)) { + quantiles = quantilesList.toArray(new NullableLongWritable[0]); + } else if (quantilesList.get(0).getClass().equals(NullableText.class)) { + quantiles = quantilesList.toArray(new NullableText[0]); } else { throw new RuntimeException("Unexpected class in " + this.getClass().getSimpleName()); } } + + /** + * @param quantilesListAsBag + * @return + */ + private ArrayList<PigNullableWritable> getList(DataBag quantilesListAsBag) { + + ArrayList<PigNullableWritable> list = new ArrayList<PigNullableWritable>(); + for (Tuple tuple : quantilesListAsBag) { + list.add(getPigNullableWritable(tuple)); + } + return list; + } } Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=763847&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Fri Apr 10 01:09:55 2009 @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.builtin; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.pig.ComparisonFunc; +import org.apache.pig.EvalFunc; +import org.apache.pig.FuncSpec; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.CountingMap; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.DefaultDataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.NullableBytesWritable; +import org.apache.pig.impl.io.NullableDoubleWritable; +import org.apache.pig.impl.io.NullableFloatWritable; +import org.apache.pig.impl.io.NullableIntWritable; +import org.apache.pig.impl.io.NullableLongWritable; +import org.apache.pig.impl.io.NullableText; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; + + +public class FindQuantiles extends EvalFunc<Map<Object, Object>>{ + // keys for the weightedparts Map + public static final String QUANTILES_LIST = "quantiles.list"; + public static final String WEIGHTED_PARTS = "weighted.parts"; + + BagFactory mBagFactory = BagFactory.getInstance(); + TupleFactory mTupleFactory = TupleFactory.getInstance(); + + boolean[] mAsc; + enum State { ALL_ASC, ALL_DESC, MIXED }; + State mState; + + private class SortComparator implements Comparator<Tuple> { + @SuppressWarnings("unchecked") + public int compare(Tuple t1, Tuple t2) { + switch (mState) { + case ALL_ASC: + return t1.compareTo(t2); + + case ALL_DESC: + return t2.compareTo(t1); + + case MIXED: + // Have to break the tuple down and compare it field to field. + int sz1 = t1.size(); + int sz2 = t2.size(); + if (sz2 < sz1) { + return 1; + } else if (sz2 > sz1) { + return -1; + } else { + for (int i = 0; i < sz1; i++) { + try { + int c = DataType.compare(t1.get(i), t2.get(i)); + if (c != 0) { + if (!mAsc[i]) c *= -1; + return c; + } + } catch (ExecException e) { + throw new RuntimeException("Unable to compare tuples", e); + } + } + return 0; + } + } + return -1; // keep the compiler happy + } + } + + private Comparator<Tuple> mComparator = new SortComparator(); + private FuncSpec mUserComparisonFuncSpec; + private ComparisonFunc mUserComparisonFunc; + + + @SuppressWarnings("unchecked") + private void instantiateFunc() { + if(mUserComparisonFunc != null) { + this.mUserComparisonFunc = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.mUserComparisonFuncSpec); + this.mUserComparisonFunc.setReporter(reporter); + this.mComparator = mUserComparisonFunc; + } + } + + // We need to instantiate any user defined comparison function + // on the backend when the FindQuantiles udf is deserialized + private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException{ + is.defaultReadObject(); + instantiateFunc(); + } + + + public FindQuantiles() { + mState = State.ALL_ASC; + } + + public FindQuantiles(String[] args) { + int startIndex = 0; + int ascFlagsLength = args.length; + // the first argument may be the information + // about user defined comparison function if one + // was specified + if(args[0].startsWith(MRCompiler.USER_COMPARATOR_MARKER)) { + mUserComparisonFuncSpec = new FuncSpec( + args[0].substring(MRCompiler.USER_COMPARATOR_MARKER.length())); + // skip the first argument now that we used it + startIndex++; + ascFlagsLength--; + } + + mAsc = new boolean[ascFlagsLength]; + boolean sawAsc = false; + boolean sawDesc = false; + for (int i = startIndex; i < ascFlagsLength; i++) { + mAsc[i] = Boolean.parseBoolean(args[i]); + if (mAsc[i]) sawAsc = true; + else sawDesc = true; + } + if (sawAsc && sawDesc) mState = State.MIXED; + else if (sawDesc) mState = State.ALL_DESC; + else mState = State.ALL_ASC; // In cast they gave us no args this + // defaults to all ascending. + } + + /** + * first field in the input tuple is the number of quantiles to generate + * second field is the *sorted* bag of samples + */ + + @Override + public Map<Object, Object> exec(Tuple in) throws IOException { + Map<Object, Object> output = new HashMap<Object, Object>(); + if(in==null || in.size()==0) + return null; + Integer numQuantiles = null; + DataBag samples = null; + ArrayList<Tuple> quantilesList = new ArrayList<Tuple>(); + Map<Tuple,Tuple> weightedParts = new HashMap<Tuple, Tuple>(); + // the sample file has a tuple as under: + // (numQuantiles, bag of samples) + // numQuantiles here is the reduce parallelism + try{ + numQuantiles = (Integer)in.get(0); + samples = (DataBag)in.get(1); + + long numSamples = samples.size(); + long toSkip = numSamples / numQuantiles; + if(toSkip == 0) { + // numSamples is < numQuantiles; + // set numQuantiles to numSamples + numQuantiles = (int)numSamples; + toSkip = 1; + } + + long ind=0, j=-1, nextQuantile = toSkip-1; + for (Tuple it : samples) { + if (ind==nextQuantile){ + ++j; + quantilesList.add(it); + nextQuantile+=toSkip; + if(j==numQuantiles-1) + break; + } + ind++; + if (ind % 1000 == 0) progress(); + } + long i=-1; + Map<Tuple,CountingMap<Integer>> contribs = new HashMap<Tuple, CountingMap<Integer>>(); + for (Tuple it : samples){ + ++i; + if (i % 1000 == 0) progress(); + int partInd = new Long(i/toSkip).intValue(); // which partition + if(partInd==numQuantiles) break; + // the quantiles array has the element from the sample which is the + // last element for a given partition. For example: if numQuantiles + // is 5 and number of samples is 100, then toSkip = 20 + // quantiles[0] = sample[19] // the 20th element + // quantiles[1] = sample[39] // the 40th element + // and so on. For any element in the sample between 0 and 19, partInd + // will be 0. We want to check if a sample element which is + // present between 0 and 19 is also the 19th (quantiles[0] element). + // This would mean that element might spread over the 0th and 1st + // partition. We are looking for contributions to a partition + // from such elements. + + // First We only check for sample elements in partitions other than the last one + // < numQuantiles -1 (partInd is 0 indexed). + if(partInd<numQuantiles-1 && areEqual(it,quantilesList.get(partInd))){ + if(!contribs.containsKey(it)){ + CountingMap<Integer> cm = new CountingMap<Integer>(); + cm.put(partInd, 1); + contribs.put(it, cm); + } + else + contribs.get(it).put(partInd, 1); + } + else{ + // we are either in the last partition (last quantile) + // OR the sample element we are currently processing is not + // the same as the element in the quantile array for this partition + // if we haven't seen this sample item earlier, this is not an + // element which crosses partitions - so ignore + if(!contribs.containsKey(it)) + continue; + else + // we have seen this sample before (in a previous partInd), + // add to the contribution associated with this sample - if we had + // not seen this sample in a previous partInd, then we would have not + // had this in the contribs map! (because of the if above).This + // "key" (represented by the sample item) can either go to the + // previous partInd or this partInd in the final sort reduce stage. + // That is where the amount of contribution to each partInd will + // matter and influence the choice. + contribs.get(it).put(partInd, 1); + } + } + int k = 0; + for(Entry<Tuple, CountingMap<Integer>> ent : contribs.entrySet()){ + if (k % 1000 == 0) progress(); + Tuple key = ent.getKey(); // sample item which repeats + + // this map will have the contributions of the sample item to the different partitions + CountingMap<Integer> value = ent.getValue(); + + long total = value.getTotalCount(); + Tuple probVec = mTupleFactory.newTuple(numQuantiles.intValue()); + // initialize all contribution fractions for different + // partitions to 0.0 + for (int l = 0; l < numQuantiles; l++) { + probVec.set(l, new Float(0.0)); + } + // for each partition that this sample item is present in, + // compute the fraction of the total occurences for that + // partition - this will be the probability with which we + // will pick this partition in the final sort reduce job + // for this sample item + for (Entry<Integer,Integer> valEnt : value.entrySet()) { + probVec.set(valEnt.getKey(), (float)valEnt.getValue()/total); + } + weightedParts.put(key, probVec); + } + output.put(QUANTILES_LIST, mBagFactory.newDefaultBag(quantilesList)); + output.put(WEIGHTED_PARTS, weightedParts); + return output; + }catch (Exception e){ + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private boolean areEqual(Tuple it, Tuple tuple) { + return mComparator.compare(it, tuple)==0; + } +}