Author: olga
Date: Tue Apr 14 19:41:10 2009
New Revision: 764931
URL: http://svn.apache.org/viewvc?rev=764931&view=rev
Log:
branch update from trunk
Added:
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/builtin/FindQuantiles.java
- copied unchanged from r764920,
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
Modified:
hadoop/pig/branches/multiquery/ (props changed)
hadoop/pig/branches/multiquery/CHANGES.txt
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt
(props changed)
Propchange: hadoop/pig/branches/multiquery/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 19:41:10 2009
@@ -1 +1 @@
-/hadoop/pig/trunk:741728-763050
+/hadoop/pig/trunk:741728-764920
Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Tue Apr 14 19:41:10 2009
@@ -28,12 +28,19 @@
BUG FIXES
+PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs
+"FileSystem closed" error on large input (pradeepkth)
+
PIG-693: Parameter to UDF which is an alias returned in another UDF in nested
foreach causes incorrect results (thejas via sms)
PIG-725: javadoc: warning - Multiple sources of package comments found for
package "org.apache.commons.logging" (gkesavan via sms)
+PIG-745: Add DataType.toString() to force basic types to chararray, useful
+for UDFs that want to handle all simple types as strings (ciemo via gates).
+
+
Release 0.2.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
(original)
+++
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
Tue Apr 14 19:41:10 2009
@@ -295,7 +295,7 @@
<section>
<title>Drop Nulls Before a Join</title>
-<p>This comment only applies to pig on the types branch, as pig 0.1.0 does not
have nulls. </p>
+<p>This comment only applies to pig 0.2.0 branch, as pig 0.1.0 does not have
nulls. </p>
<p>With the introduction of nulls, join and cogroup semantics were altered to
work with nulls. The semantic for cogrouping with nulls is that nulls from a
given input are grouped together, but nulls across inputs are not grouped
together. This preserves the semantics of grouping (nulls are collected
together from a single input to be passed to aggregate functions like COUNT)
and the semantics of join (nulls are not joined across inputs). Since
flattening an empty bag results in an empty row, in a standard join the rows
with a null key will always be dropped. The join: </p>
<source>
Modified:
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
(original)
+++
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
Tue Apr 14 19:41:10 2009
@@ -32,6 +32,6 @@
-->
<tab label="Project" href="http://hadoop.apache.org/pig/" type="visible" />
<tab label="Wiki" href="http://wiki.apache.org/pig/" type="visible" />
- <tab label="Pig 1.0.0 Documentation" dir="" type="visible" />
+ <tab label="Pig 0.2.0 Documentation" dir="" type="visible" />
</tabs>
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Tue Apr 14 19:41:10 2009
@@ -36,6 +36,7 @@
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -153,6 +154,8 @@
private CompilationMessageCollector messageCollector = null;
+ public static String USER_COMPARATOR_MARKER = "user.comparator.func:";
+
public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
this(plan,null);
}
@@ -1354,10 +1357,55 @@
mro.reducePlan.add(nfe2);
mro.reducePlan.connect(pkg, nfe2);
+ // Let's connect the output from the foreach containing
+ // number of quantiles and the sorted bag of samples to
+ // another foreach with the FindQuantiles udf. The input
+ // to the FindQuantiles udf is a project(*) which takes the
+ // foreach input and gives it to the udf
+ PhysicalPlan ep4 = new PhysicalPlan();
+ POProject prjStar4 = new POProject(new
OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar4.setResultType(DataType.TUPLE);
+ prjStar4.setStar(true);
+ ep4.add(prjStar4);
+
+ List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+ ufInps.add(prjStar4);
+ // Turn the asc/desc array into an array of strings so that we can
pass it
+ // to the FindQuantiles function.
+ List<Boolean> ascCols = inpSort.getMAscCols();
+ String[] ascs = new String[ascCols.size()];
+ for (int i = 0; i < ascCols.size(); i++) ascs[i] =
ascCols.get(i).toString();
+ // check if user defined comparator is used in the sort, if so
+ // prepend the name of the comparator as the first fields in the
+ // constructor args array to the FindQuantiles udf
+ String[] ctorArgs = ascs;
+ if(sort.isUDFComparatorUsed) {
+ String userComparatorFuncSpec =
sort.getMSortFunc().getFuncSpec().toString();
+ ctorArgs = new String[ascs.length + 1];
+ ctorArgs[0] = USER_COMPARATOR_MARKER + userComparatorFuncSpec;
+ for(int j = 0; j < ascs.length; j++) {
+ ctorArgs[j+1] = ascs[j];
+ }
+ }
+
+ POUserFunc uf = new POUserFunc(new
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
+ new FuncSpec(FindQuantiles.class.getName(), ctorArgs));
+ ep4.add(uf);
+ ep4.connect(prjStar4, uf);
+
+ List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
+ ep4s.add(ep4);
+ List<Boolean> flattened3 = new ArrayList<Boolean>();
+ flattened3.add(false);
+ POForEach nfe3 = new POForEach(new
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
+
+ mro.reducePlan.add(nfe3);
+ mro.reducePlan.connect(nfe2, nfe3);
+
POStore str = getStore();
str.setSFile(quantFile);
mro.reducePlan.add(str);
- mro.reducePlan.connect(nfe2, str);
+ mro.reducePlan.connect(nfe3, str);
mro.setReduceDone(true);
mro.requestedParallelism = 1;
Modified:
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
---
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
(original)
+++
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
Tue Apr 14 19:41:10 2009
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -29,12 +30,14 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullableBytesWritable;
@@ -68,6 +71,7 @@
return gen.getNext();
}
+ @SuppressWarnings("unchecked")
public void configure(JobConf job) {
this.job = job;
String quantilesFile = job.get("pig.quantilesFile", "");
@@ -78,120 +82,41 @@
try{
InputStream is =
FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
BinStorage loader = new BinStorage();
- ArrayList<PigNullableWritable> quantilesList = new
ArrayList<PigNullableWritable>();
+ DataBag quantilesList;
loader.bindTo(quantilesFile, new
BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Tuple t = loader.getNext();
if(t==null) throw new RuntimeException("Empty samples file");
// the Quantiles file has a tuple as under:
// (numQuantiles, bag of samples)
// numQuantiles here is the reduce parallelism
- numQuantiles = (Integer) t.get(0);
- samples = (DataBag) t.get(1);
- long numSamples = samples.size();
- long toSkip = numSamples / numQuantiles;
- if(toSkip == 0) {
- // numSamples is < numQuantiles;
- // set numQuantiles to numSamples
- numQuantiles = (int)numSamples;
- toSkip = 1;
- }
-
- long ind=0, j=-1, nextQuantile = toSkip-1;
- for (Tuple it : samples) {
- if (ind==nextQuantile){
- ++j;
- quantilesList.add(getPigNullableWritable(it));
- nextQuantile+=toSkip;
- if(j==numQuantiles-1)
- break;
- }
- ind++;
- //TODO how do we report progress?
- //if (i % 1000 == 0) progress();
- // Currently there is no way to report progress since
- // in configure() we cannot get a handle to the reporter
- // (even PhysicalOperator.getReporter() does not work! It is
- // set to null.) Hopefully the work done here wll be < 10
minutes
- // since we are dealing with 100* num_mapper samples. When
- // RandomSampleLoader becomes an operator or UDF instead of a
- // loader hopefully we can intelligently decide on the number
- // of samples (instead of the static 100) and then not being
- // able to report progress may not be a big issue.
- }
+ Map<String, Object> quantileMap = (Map<String, Object>) t.get(0);
+ quantilesList = (DataBag)
quantileMap.get(FindQuantiles.QUANTILES_LIST);
+ Map<Tuple, Tuple> weightedPartsData = (Map<Tuple, Tuple>)
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
convertToArray(quantilesList);
- long i=-1;
- Map<PigNullableWritable,CountingMap<Integer>> contribs = new
HashMap<PigNullableWritable, CountingMap<Integer>>();
- for (Tuple it : samples){
- ++i;
- PigNullableWritable sample = getPigNullableWritable(it);
- int partInd = new Long(i/toSkip).intValue(); // which partition
- if(partInd==numQuantiles) break;
- // the quantiles array has the element from the sample which
is the
- // last element for a given partition. For example: if
numQunatiles
- // is 5 and number of samples is 100, then toSkip = 20
- // quantiles[0] = sample[19] // the 20th element
- // quantiles[1] = sample[39] // the 40th element
- // and so on. For any element in the sample between 0 and 19,
partInd
- // will be 0. We want to check if a sample element which is
- // present between 0 and 19 and is also the 19th (quantiles[0]
element).
- // This would mean that element might spread over the 0th and
1st
- // partition. We are looking for contributions to a partition
- // from such elements.
-
- // First We only check for sample elements in partitions other
than the last one
- // < numQunatiles -1 (partInd is 0 indexed).
- if(partInd<numQuantiles-1 &&
areEqual(sample,quantiles[partInd])){
- if(!contribs.containsKey(sample)){
- CountingMap<Integer> cm = new CountingMap<Integer>();
- cm.put(partInd, 1);
- contribs.put(sample, cm);
- }
- else
- contribs.get(sample).put(partInd, 1);
- }
- else{
- // we are either in the last partition (last quantile)
- // OR the sample element we are currently processing is not
- // the same as the element in the quantile array for this
partition
- // if we haven't seen this sample item earlier, this is
not an
- // element which crosses partitions - so ignore
- if(!contribs.containsKey(sample))
- continue;
- else
- // we have seen this sample before (in a previous
partInd),
- // add to the contribution associated with this sample
- if we had
- // not seen this sample in a previous partInd, then we
have not
- // had this in the contribs map! (because of the if
above).This
- // sample can either go to the previous partInd or
this partInd
- // in the final sort reduce stage. That is where the
amount of
- // contribution to each partInd will matter and
influence the choice.
- contribs.get(sample).put(partInd, 1);
- }
- }
- for(Entry<PigNullableWritable, CountingMap<Integer>> ent :
contribs.entrySet()){
- PigNullableWritable key = ent.getKey(); // sample item which
repeats
-
- // this map will have the contributions of the sample item to
the different partitions
- CountingMap<Integer> value = ent.getValue();
-
- long total = value.getTotalCount();
- float[] probVec = new float[numQuantiles];
- // for each partition that this sample item is present in,
- // compute the fraction of the total occurences for that
- // partition - this will be the probability with which we
- // will pick this partition in the final sort reduce job
- // for this sample item
- for (Entry<Integer,Integer> valEnt : value.entrySet()) {
- probVec[valEnt.getKey()] = (float)valEnt.getValue()/total;
- }
-// weightedParts.put(key, new
DiscreteProbabilitySampleGenerator(11317,probVec));
- weightedParts.put(key, new
DiscreteProbabilitySampleGenerator(probVec));
+ for(Entry<Tuple, Tuple> ent : weightedPartsData.entrySet()){
+ Tuple key = ent.getKey(); // sample item which repeats
+ float[] probVec = getProbVec(ent.getValue());
+ weightedParts.put(getPigNullableWritable(key),
+ new DiscreteProbabilitySampleGenerator(probVec));
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
+ /**
+ * @param value
+ * @return
+ * @throws ExecException
+ */
+ private float[] getProbVec(Tuple values) throws ExecException {
+ float[] probVec = new float[values.size()];
+ for(int i = 0; i < values.size(); i++) {
+ probVec[i] = (Float)values.get(i);
+ }
+ return probVec;
+ }
+
private PigNullableWritable getPigNullableWritable(Tuple t) {
try {
// user comparators work with tuples - so if user comparator
@@ -219,24 +144,38 @@
}
private void convertToArray(
- ArrayList<PigNullableWritable> q) {
+ DataBag quantilesListAsBag) {
+ ArrayList<PigNullableWritable> quantilesList =
getList(quantilesListAsBag);
if ("true".equals(job.get("pig.usercomparator")) ||
- q.get(0).getClass().equals(NullableTuple.class)) {
- quantiles = q.toArray(new NullableTuple[0]);
- } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) {
- quantiles = q.toArray(new NullableBytesWritable[0]);
- } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) {
- quantiles = q.toArray(new NullableDoubleWritable[0]);
- } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) {
- quantiles = q.toArray(new NullableFloatWritable[0]);
- } else if (q.get(0).getClass().equals(NullableIntWritable.class)) {
- quantiles = q.toArray(new NullableIntWritable[0]);
- } else if (q.get(0).getClass().equals(NullableLongWritable.class)) {
- quantiles = q.toArray(new NullableLongWritable[0]);
- } else if (q.get(0).getClass().equals(NullableText.class)) {
- quantiles = q.toArray(new NullableText[0]);
+ quantilesList.get(0).getClass().equals(NullableTuple.class)) {
+ quantiles = quantilesList.toArray(new NullableTuple[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableBytesWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableBytesWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableDoubleWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableDoubleWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableFloatWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableFloatWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableIntWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableIntWritable[0]);
+ } else if
(quantilesList.get(0).getClass().equals(NullableLongWritable.class)) {
+ quantiles = quantilesList.toArray(new NullableLongWritable[0]);
+ } else if (quantilesList.get(0).getClass().equals(NullableText.class))
{
+ quantiles = quantilesList.toArray(new NullableText[0]);
} else {
throw new RuntimeException("Unexpected class in " +
this.getClass().getSimpleName());
}
}
+
+ /**
+ * @param quantilesListAsBag
+ * @return
+ */
+ private ArrayList<PigNullableWritable> getList(DataBag quantilesListAsBag)
{
+
+ ArrayList<PigNullableWritable> list = new
ArrayList<PigNullableWritable>();
+ for (Tuple tuple : quantilesListAsBag) {
+ list.add(getPigNullableWritable(tuple));
+ }
+ return list;
+ }
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java
(original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java Tue
Apr 14 19:41:10 2009
@@ -619,6 +619,64 @@
}
/**
+ * Force a data object to a String, if possible. Any simple (atomic) type
+ * can be forced to a String including ByteArray. Complex types cannot be
+ * forced to a String. This isn't particularly efficient, so if you
+ * already <b>know</b> that the object you have is a String you
+ * should just cast it.
+ * @return The object as a String.
+ * @throws ExecException if the type can't be forced to a String.
+ */
+ public static String toString(Object o) throws ExecException {
+ try {
+ switch (findType(o)) {
+ case INTEGER:
+ return ((Integer)o).toString();
+
+ case LONG:
+ return ((Long)o).toString();
+
+ case FLOAT:
+ return ((Float)o).toString();
+
+ case DOUBLE:
+ return ((Double)o).toString();
+
+ case BYTEARRAY:
+ return ((DataByteArray)o).toString();
+
+ case CHARARRAY:
+ return ((String)o);
+
+ case NULL:
+ return null;
+
+ case BOOLEAN:
+ return ((Boolean)o).toString();
+
+ case BYTE:
+ return ((Byte)o).toString();
+
+ case MAP:
+ case TUPLE:
+ case BAG:
+ case UNKNOWN:
+ default:
+ int errCode = 1071;
+ String msg = "Cannot convert a " + findTypeName(o) +
+ " to a String";
+ throw new ExecException(msg, errCode,
PigException.INPUT);
+ }
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2054;
+ String msg = "Internal error. Could not convert " + o +
" to String.";
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ /**
* If this object is a map, return it as a map.
* This isn't particularly efficient, so if you
* already <b>know</b> that the object you have is a Map you
Propchange:
hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 19:41:10 2009
@@ -1 +1 @@
-/hadoop/pig/trunk/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt:758070-763050
+/hadoop/pig/trunk/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt:758070-764920