Author: pradeepkth
Date: Fri Apr 10 01:09:55 2009
New Revision: 763847
URL: http://svn.apache.org/viewvc?rev=763847&view=rev
Log:
PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs
"FileSystem closed" error on large input
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=763847&r1=763846&r2=763847&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Apr 10 01:09:55 2009
@@ -28,6 +28,9 @@
BUG FIXES
+PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs
+"FileSystem closed" error on large input (pradeepkth)
+
PIG-693: Parameter to UDF which is an alias returned in another UDF in nested
foreach causes incorrect results (thejas via sms)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=763847&r1=763846&r2=763847&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Apr 10 01:09:55 2009
@@ -36,6 +36,7 @@
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -149,6 +150,8 @@
private CompilationMessageCollector messageCollector = null;
+ public static String USER_COMPARATOR_MARKER = "user.comparator.func:";
+
public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
this(plan,null);
}
@@ -1312,10 +1315,55 @@
mro.reducePlan.add(nfe2);
mro.reducePlan.connect(pkg, nfe2);
+ // Let's connect the output from the foreach containing
+ // number of quantiles and the sorted bag of samples to
+ // another foreach with the FindQuantiles udf. The input
+ // to the FindQuantiles udf is a project(*) which takes the
+ // foreach input and gives it to the udf
+ PhysicalPlan ep4 = new PhysicalPlan();
+ POProject prjStar4 = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar4.setResultType(DataType.TUPLE);
+ prjStar4.setStar(true);
+ ep4.add(prjStar4);
+
+ List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+ ufInps.add(prjStar4);
+ // Turn the asc/desc array into an array of strings so that we can
pass it
+ // to the FindQuantiles function.
+ List<Boolean> ascCols = inpSort.getMAscCols();
+ String[] ascs = new String[ascCols.size()];
+ for (int i = 0; i < ascCols.size(); i++) ascs[i] =
ascCols.get(i).toString();
+ // check if user defined comparator is used in the sort, if so
+ // prepend the name of the comparator as the first fields in the
+ // constructor args array to the FindQuantiles udf
+ String[] ctorArgs = ascs;
+ if(sort.isUDFComparatorUsed) {
+ String userComparatorFuncSpec =
sort.getMSortFunc().getFuncSpec().toString();
+ ctorArgs = new String[ascs.length + 1];
+ ctorArgs[0] = USER_COMPARATOR_MARKER + userComparatorFuncSpec;
+ for(int j = 0; j < ascs.length; j++) {
+ ctorArgs[j+1] = ascs[j];
+ }
+ }
+
+ POUserFunc uf = new POUserFunc(new
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
+ new FuncSpec(FindQuantiles.class.getName(), ctorArgs));
+ ep4.add(uf);
+ ep4.connect(prjStar4, uf);
+
+ List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
+ ep4s.add(ep4);
+ List<Boolean> flattened3 = new ArrayList<Boolean>();
+ flattened3.add(false);
+ POForEach nfe3 = new POForEach(new
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
+
+ mro.reducePlan.add(nfe3);
+ mro.reducePlan.connect(nfe2, nfe3);
+
POStore str = getStore();
str.setSFile(quantFile);
mro.reducePlan.add(str);
- mro.reducePlan.connect(nfe2, str);
+ mro.reducePlan.connect(nfe3, str);
mro.setReduceDone(true);
mro.requestedParallelism = 1;
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=763847&r1=763846&r2=763847&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
Fri Apr 10 01:09:55 2009
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -29,12 +30,14 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullableBytesWritable;
@@ -68,6 +71,7 @@
return gen.getNext();
}
+ @SuppressWarnings("unchecked")
public void configure(JobConf job) {
this.job = job;
String quantilesFile = job.get("pig.quantilesFile", "");
@@ -78,120 +82,41 @@
try{
InputStream is =
FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
BinStorage loader = new BinStorage();
- ArrayList<PigNullableWritable> quantilesList = new
ArrayList<PigNullableWritable>();
+ DataBag quantilesList;
loader.bindTo(quantilesFile, new
BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Tuple t = loader.getNext();
if(t==null) throw new RuntimeException("Empty samples file");
// the Quantiles file has a tuple as under:
// (numQuantiles, bag of samples)
// numQuantiles here is the reduce parallelism
- numQuantiles = (Integer) t.get(0);
- samples = (DataBag) t.get(1);
- long numSamples = samples.size();
- long toSkip = numSamples / numQuantiles;
- if(toSkip == 0) {
- // numSamples is < numQuantiles;
- // set numQuantiles to numSamples
- numQuantiles = (int)numSamples;
- toSkip = 1;
- }
-
- long ind=0, j=-1, nextQuantile = toSkip-1;
- for (Tuple it : samples) {
- if (ind==nextQuantile){
- ++j;
- quantilesList.add(getPigNullableWritable(it));
- nextQuantile+=toSkip;
- if(j==numQuantiles-1)
- break;
- }
- ind++;
- //TODO how do we report progress?
- //if (i % 1000 == 0) progress();
- // Currently there is no way to report progress since
- // in configure() we cannot get a handle to the reporter
- // (even PhysicalOperator.getReporter() does not work! It is
- // set to null.) Hopefully the work done here wll be < 10
minutes
- // since we are dealing with 100* num_mapper samples. When
- // RandomSampleLoader becomes an operator or UDF instead of a
- // loader hopefully we can intelligently decide on the number
- // of samples (instead of the static 100) and then not being
- // able to report progress may not be a big issue.
- }
+ Map<String, Object> quantileMap = (Map<String, Object>) t.get(0);
+ quantilesList = (DataBag)
quantileMap.get(FindQuantiles.QUANTILES_LIST);
+ Map<Tuple, Tuple> weightedPartsData = (Map<Tuple, Tuple>)
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
convertToArray(quantilesList);
- long i=-1;
- Map<PigNullableWritable,CountingMap<Integer>> contribs = new
HashMap<PigNullableWritable, CountingMap<Integer>>();
- for (Tuple it : samples){
- ++i;
- PigNullableWritable sample = getPigNullableWritable(it);
- int partInd = new Long(i/toSkip).intValue(); // which partition
- if(partInd==numQuantiles) break;
- // the quantiles array has the element from the sample which
is the
- // last element for a given partition. For example: if
numQunatiles
- // is 5 and number of samples is 100, then toSkip = 20
- // quantiles[0] = sample[19] // the 20th element
- // quantiles[1] = sample[39] // the 40th element
- // and so on. For any element in the sample between 0 and 19,
partInd
- // will be 0. We want to check if a sample element which is
- // present between 0 and 19 and is also the 19th (quantiles[0]
element).
- // This would mean that element might spread over the 0th and
1st
- // partition. We are looking for contributions to a partition
- // from such elements.
-
- // First We only check for sample elements in partitions other
than the last one
- // < numQunatiles -1 (partInd is 0 indexed).
- if(partInd<numQuantiles-1 &&
areEqual(sample,quantiles[partInd])){
- if(!contribs.containsKey(sample)){
- CountingMap<Integer> cm = new CountingMap<Integer>();
- cm.put(partInd, 1);
- contribs.put(sample, cm);
- }
- else
- contribs.get(sample).put(partInd, 1);
- }
- else{
- // we are either in the last partition (last quantile)
- // OR the sample element we are currently processing is not
- // the same as the element in the quantile array for this
partition
- // if we haven't seen this sample item earlier, this is
not an
- // element which crosses partitions - so ignore
- if(!contribs.containsKey(sample))
- continue;
- else
- // we have seen this sample before (in a previous
partInd),
- // add to the contribution associated with this sample
- if we had
- // not seen this sample in a previous partInd, then we
have not
- // had this in the contribs map! (because of the if
above).This
- // sample can either go to the previous partInd or
this partInd
- // in the final sort reduce stage. That is where the
amount of
- // contribution to each partInd will matter and
influence the choice.
- contribs.get(sample).put(partInd, 1);
- }
- }
- for(Entry<PigNullableWritable, CountingMap<Integer>> ent :
contribs.entrySet()){
- PigNullableWritable key = ent.getKey(); // sample item which
repeats
-
- // this map will have the contributions of the sample item to
the different partitions
- CountingMap<Integer> value = ent.getValue();
-
- long total = value.getTotalCount();
- float[] probVec = new float[numQuantiles];
- // for each partition that this sample item is present in,
- // compute the fraction of the total occurences for that
- // partition - this will be the probability with which we
- // will pick this partition in the final sort reduce job
- // for this sample item
- for (Entry<Integer,Integer> valEnt : value.entrySet()) {
- probVec[valEnt.getKey()] = (float)valEnt.getValue()/total;
- }
-// weightedParts.put(key, new
DiscreteProbabilitySampleGenerator(11317,probVec));
- weightedParts.put(key, new
DiscreteProbabilitySampleGenerator(probVec));
+ for(Entry<Tuple, Tuple> ent : weightedPartsData.entrySet()){
+ Tuple key = ent.getKey(); // sample item which repeats
+ float[] probVec = getProbVec(ent.getValue());
+ weightedParts.put(getPigNullableWritable(key),
+ new DiscreteProbabilitySampleGenerator(probVec));
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
+ /**
+ * @param value
+ * @return
+ * @throws ExecException
+ */
+ private float[] getProbVec(Tuple values) throws ExecException {
+ float[] probVec = new float[values.size()];
+ for(int i = 0; i < values.size(); i++) {
+ probVec[i] = (Float)values.get(i);
+ }
+ return probVec;
+ }
+
private PigNullableWritable getPigNullableWritable(Tuple t) {
try {
// user comparators work with tuples - so if user comparator
@@ -219,24 +144,38 @@
}
private void convertToArray(
- ArrayList<PigNullableWritable> q) {
+ DataBag quantilesListAsBag) {
+ ArrayList<PigNullableWritable> quantilesList =
getList(quantilesListAsBag);
if ("true".equals(job.get("pig.usercomparator")) ||
- q.get(0).getClass().equals(NullableTuple.class)) {
- quantiles = q.toArray(new NullableTuple[0]);
- } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) {
- quantiles = q.toArray(new NullableBytesWritable[0]);
- } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) {
- quantiles = q.toArray(new NullableDoubleWritable[0]);
- } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) {
- quantiles = q.toArray(new NullableFloatWritable[0]);
- } else if (q.get(0).getClass().equals(NullableIntWritable.class)) {
- quantiles = q.toArray(new NullableIntWritable[0]);
- } else if (q.get(0).getClass().equals(NullableLongWritable.class)) {
- quantiles = q.toArray(new NullableLongWritable[0]);
- } else if (q.get(0).getClass().equals(NullableText.class)) {
- quantiles = q.toArray(new NullableText[0]);
+ quantilesList.get(0).getClass().equals(NullableTuple.class)) {
+ quantiles = quantilesList.toArray(new NullableTuple[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableBytesWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableBytesWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableDoubleWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableDoubleWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableFloatWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableFloatWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableIntWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableIntWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableLongWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableLongWritable[0]);
+ } else if (quantilesList.get(0).getClass().equals(NullableText.class))
{
+ quantiles = quantilesList.toArray(new NullableText[0]);
} else {
throw new RuntimeException("Unexpected class in " +
this.getClass().getSimpleName());
}
}
+
+ /**
+ * @param quantilesListAsBag
+ * @return
+ */
+ private ArrayList<PigNullableWritable> getList(DataBag quantilesListAsBag)
{
+
+ ArrayList<PigNullableWritable> list = new
ArrayList<PigNullableWritable>();
+ for (Tuple tuple : quantilesListAsBag) {
+ list.add(getPigNullableWritable(tuple));
+ }
+ return list;
+ }
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=763847&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Fri Apr
10 01:09:55 2009
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.CountingMap;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+
+public class FindQuantiles extends EvalFunc<Map<Object, Object>>{
+ // keys for the weightedparts Map
+ public static final String QUANTILES_LIST = "quantiles.list";
+ public static final String WEIGHTED_PARTS = "weighted.parts";
+
+ BagFactory mBagFactory = BagFactory.getInstance();
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ boolean[] mAsc;
+ enum State { ALL_ASC, ALL_DESC, MIXED };
+ State mState;
+
+ private class SortComparator implements Comparator<Tuple> {
+ @SuppressWarnings("unchecked")
+ public int compare(Tuple t1, Tuple t2) {
+ switch (mState) {
+ case ALL_ASC:
+ return t1.compareTo(t2);
+
+ case ALL_DESC:
+ return t2.compareTo(t1);
+
+ case MIXED:
+ // Have to break the tuple down and compare it field to field.
+ int sz1 = t1.size();
+ int sz2 = t2.size();
+ if (sz2 < sz1) {
+ return 1;
+ } else if (sz2 > sz1) {
+ return -1;
+ } else {
+ for (int i = 0; i < sz1; i++) {
+ try {
+ int c = DataType.compare(t1.get(i), t2.get(i));
+ if (c != 0) {
+ if (!mAsc[i]) c *= -1;
+ return c;
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to compare
tuples", e);
+ }
+ }
+ return 0;
+ }
+ }
+ return -1; // keep the compiler happy
+ }
+ }
+
+ private Comparator<Tuple> mComparator = new SortComparator();
+ private FuncSpec mUserComparisonFuncSpec;
+ private ComparisonFunc mUserComparisonFunc;
+
+
+ @SuppressWarnings("unchecked")
+ private void instantiateFunc() {
+ if(mUserComparisonFunc != null) {
+ this.mUserComparisonFunc = (ComparisonFunc)
PigContext.instantiateFuncFromSpec(this.mUserComparisonFuncSpec);
+ this.mUserComparisonFunc.setReporter(reporter);
+ this.mComparator = mUserComparisonFunc;
+ }
+ }
+
+ // We need to instantiate any user defined comparison function
+ // on the backend when the FindQuantiles udf is deserialized
+ private void readObject(ObjectInputStream is) throws IOException,
ClassNotFoundException{
+ is.defaultReadObject();
+ instantiateFunc();
+ }
+
+
+ public FindQuantiles() {
+ mState = State.ALL_ASC;
+ }
+
+ public FindQuantiles(String[] args) {
+ int startIndex = 0;
+ int ascFlagsLength = args.length;
+ // the first argument may be the information
+ // about user defined comparison function if one
+ // was specified
+ if(args[0].startsWith(MRCompiler.USER_COMPARATOR_MARKER)) {
+ mUserComparisonFuncSpec = new FuncSpec(
+
args[0].substring(MRCompiler.USER_COMPARATOR_MARKER.length()));
+ // skip the first argument now that we used it
+ startIndex++;
+ ascFlagsLength--;
+ }
+
+ mAsc = new boolean[ascFlagsLength];
+ boolean sawAsc = false;
+ boolean sawDesc = false;
+ for (int i = startIndex; i < ascFlagsLength; i++) {
+ mAsc[i] = Boolean.parseBoolean(args[i]);
+ if (mAsc[i]) sawAsc = true;
+ else sawDesc = true;
+ }
+ if (sawAsc && sawDesc) mState = State.MIXED;
+ else if (sawDesc) mState = State.ALL_DESC;
+ else mState = State.ALL_ASC; // In cast they gave us no args this
+ // defaults to all ascending.
+ }
+
+ /**
+ * first field in the input tuple is the number of quantiles to generate
+ * second field is the *sorted* bag of samples
+ */
+
+ @Override
+ public Map<Object, Object> exec(Tuple in) throws IOException {
+ Map<Object, Object> output = new HashMap<Object, Object>();
+ if(in==null || in.size()==0)
+ return null;
+ Integer numQuantiles = null;
+ DataBag samples = null;
+ ArrayList<Tuple> quantilesList = new ArrayList<Tuple>();
+ Map<Tuple,Tuple> weightedParts = new HashMap<Tuple, Tuple>();
+ // the sample file has a tuple as under:
+ // (numQuantiles, bag of samples)
+ // numQuantiles here is the reduce parallelism
+ try{
+ numQuantiles = (Integer)in.get(0);
+ samples = (DataBag)in.get(1);
+
+ long numSamples = samples.size();
+ long toSkip = numSamples / numQuantiles;
+ if(toSkip == 0) {
+ // numSamples is < numQuantiles;
+ // set numQuantiles to numSamples
+ numQuantiles = (int)numSamples;
+ toSkip = 1;
+ }
+
+ long ind=0, j=-1, nextQuantile = toSkip-1;
+ for (Tuple it : samples) {
+ if (ind==nextQuantile){
+ ++j;
+ quantilesList.add(it);
+ nextQuantile+=toSkip;
+ if(j==numQuantiles-1)
+ break;
+ }
+ ind++;
+ if (ind % 1000 == 0) progress();
+ }
+ long i=-1;
+ Map<Tuple,CountingMap<Integer>> contribs = new HashMap<Tuple,
CountingMap<Integer>>();
+ for (Tuple it : samples){
+ ++i;
+ if (i % 1000 == 0) progress();
+ int partInd = new Long(i/toSkip).intValue(); // which partition
+ if(partInd==numQuantiles) break;
+ // the quantiles array has the element from the sample which
is the
+ // last element for a given partition. For example: if
numQuantiles
+ // is 5 and number of samples is 100, then toSkip = 20
+ // quantiles[0] = sample[19] // the 20th element
+ // quantiles[1] = sample[39] // the 40th element
+ // and so on. For any element in the sample between 0 and 19,
partInd
+ // will be 0. We want to check if a sample element which is
+ // present between 0 and 19 is also the 19th (quantiles[0]
element).
+ // This would mean that element might spread over the 0th and
1st
+ // partition. We are looking for contributions to a partition
+ // from such elements.
+
+ // First We only check for sample elements in partitions other
than the last one
+ // < numQuantiles -1 (partInd is 0 indexed).
+ if(partInd<numQuantiles-1 &&
areEqual(it,quantilesList.get(partInd))){
+ if(!contribs.containsKey(it)){
+ CountingMap<Integer> cm = new CountingMap<Integer>();
+ cm.put(partInd, 1);
+ contribs.put(it, cm);
+ }
+ else
+ contribs.get(it).put(partInd, 1);
+ }
+ else{
+ // we are either in the last partition (last quantile)
+ // OR the sample element we are currently processing is not
+ // the same as the element in the quantile array for this
partition
+ // if we haven't seen this sample item earlier, this is
not an
+ // element which crosses partitions - so ignore
+ if(!contribs.containsKey(it))
+ continue;
+ else
+ // we have seen this sample before (in a previous
partInd),
+ // add to the contribution associated with this sample
- if we had
+ // not seen this sample in a previous partInd, then we
would have not
+ // had this in the contribs map! (because of the if
above).This
+ // "key" (represented by the sample item) can either
go to the
+ // previous partInd or this partInd in the final sort
reduce stage.
+ // That is where the amount of contribution to each
partInd will
+ // matter and influence the choice.
+ contribs.get(it).put(partInd, 1);
+ }
+ }
+ int k = 0;
+ for(Entry<Tuple, CountingMap<Integer>> ent : contribs.entrySet()){
+ if (k % 1000 == 0) progress();
+ Tuple key = ent.getKey(); // sample item which repeats
+
+ // this map will have the contributions of the sample item to
the different partitions
+ CountingMap<Integer> value = ent.getValue();
+
+ long total = value.getTotalCount();
+ Tuple probVec =
mTupleFactory.newTuple(numQuantiles.intValue());
+ // initialize all contribution fractions for different
+ // partitions to 0.0
+ for (int l = 0; l < numQuantiles; l++) {
+ probVec.set(l, new Float(0.0));
+ }
+ // for each partition that this sample item is present in,
+ // compute the fraction of the total occurences for that
+ // partition - this will be the probability with which we
+ // will pick this partition in the final sort reduce job
+ // for this sample item
+ for (Entry<Integer,Integer> valEnt : value.entrySet()) {
+ probVec.set(valEnt.getKey(),
(float)valEnt.getValue()/total);
+ }
+ weightedParts.put(key, probVec);
+ }
+ output.put(QUANTILES_LIST,
mBagFactory.newDefaultBag(quantilesList));
+ output.put(WEIGHTED_PARTS, weightedParts);
+ return output;
+ }catch (Exception e){
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean areEqual(Tuple it, Tuple tuple) {
+ return mComparator.compare(it, tuple)==0;
+ }
+}