Author: olga
Date: Tue Apr 14 19:41:10 2009
New Revision: 764931

URL: http://svn.apache.org/viewvc?rev=764931&view=rev
Log:
branch update from trunk

Added:
    
hadoop/pig/branches/multiquery/src/org/apache/pig/impl/builtin/FindQuantiles.java
      - copied unchanged from r764920, 
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
Modified:
    hadoop/pig/branches/multiquery/   (props changed)
    hadoop/pig/branches/multiquery/CHANGES.txt
    
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
    
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
    hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java
    
hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt
   (props changed)

Propchange: hadoop/pig/branches/multiquery/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 19:41:10 2009
@@ -1 +1 @@
-/hadoop/pig/trunk:741728-763050
+/hadoop/pig/trunk:741728-764920

Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Tue Apr 14 19:41:10 2009
@@ -28,12 +28,19 @@
 
 BUG FIXES
 
+PIG-733: Order by sampling dumps entire sample to hdfs which causes dfs
+"FileSystem closed" error on large input (pradeepkth)
+
 PIG-693: Parameter to UDF which is an alias returned in another UDF in nested
 foreach causes incorrect results (thejas via sms)
 
 PIG-725: javadoc: warning - Multiple sources of package comments found for
 package "org.apache.commons.logging" (gkesavan via sms)
 
+PIG-745: Add DataType.toString() to force basic types to chararray, useful
+for UDFs that want to handle all simple types as strings (ciemo via gates).
+
+
 Release 0.2.0 - Unreleased
 
 INCOMPATIBLE CHANGES

Modified: 
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
 (original)
+++ 
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/cookbook.xml
 Tue Apr 14 19:41:10 2009
@@ -295,7 +295,7 @@
 
 <section>
 <title>Drop Nulls Before a Join</title>
-<p>This comment only applies to pig on the types branch, as pig 0.1.0 does not 
have nulls. </p>
+<p>This comment only applies to pig 0.2.0 branch, as pig 0.1.0 does not have 
nulls. </p>
 <p>With the introduction of nulls, join and cogroup semantics were altered to 
work with nulls.  The semantic for cogrouping with nulls is that nulls from a 
given input are grouped together, but nulls across inputs are not grouped 
together.  This preserves the semantics of grouping (nulls are collected 
together from a single input to be passed to aggregate functions like COUNT) 
and the semantics of join (nulls are not joined across inputs).  Since 
flattening an empty bag results in an empty row, in a standard join the rows 
with a null key will always be dropped.  The join:  </p>
 
 <source>

Modified: 
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
 (original)
+++ 
hadoop/pig/branches/multiquery/src/docs/src/documentation/content/xdocs/tabs.xml
 Tue Apr 14 19:41:10 2009
@@ -32,6 +32,6 @@
   -->
   <tab label="Project" href="http://hadoop.apache.org/pig/"; type="visible" /> 
   <tab label="Wiki" href="http://wiki.apache.org/pig/"; type="visible" /> 
-  <tab label="Pig 1.0.0 Documentation" dir="" type="visible" /> 
+  <tab label="Pig 0.2.0 Documentation" dir="" type="visible" /> 
 
 </tabs>

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Tue Apr 14 19:41:10 2009
@@ -36,6 +36,7 @@
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -153,6 +154,8 @@
     
     private CompilationMessageCollector messageCollector = null;
     
+    public static String USER_COMPARATOR_MARKER = "user.comparator.func:";
+    
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
         this(plan,null);
     }
@@ -1354,10 +1357,55 @@
         mro.reducePlan.add(nfe2);
         mro.reducePlan.connect(pkg, nfe2);
         
+        // Let's connect the output from the foreach containing
+        // number of quantiles and the sorted bag of samples to
+        // another foreach with the FindQuantiles udf. The input
+        // to the FindQuantiles udf is a project(*) which takes the 
+        // foreach input and gives it to the udf
+        PhysicalPlan ep4 = new PhysicalPlan();
+        POProject prjStar4 = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+        prjStar4.setResultType(DataType.TUPLE);
+        prjStar4.setStar(true);
+        ep4.add(prjStar4);
+        
+        List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+        ufInps.add(prjStar4);
+        // Turn the asc/desc array into an array of strings so that we can 
pass it
+        // to the FindQuantiles function.
+        List<Boolean> ascCols = inpSort.getMAscCols();
+        String[] ascs = new String[ascCols.size()];
+        for (int i = 0; i < ascCols.size(); i++) ascs[i] = 
ascCols.get(i).toString();
+        // check if user defined comparator is used in the sort, if so
+        // prepend the name of the comparator as the first fields in the
+        // constructor args array to the FindQuantiles udf
+        String[] ctorArgs = ascs;
+        if(sort.isUDFComparatorUsed) {
+            String userComparatorFuncSpec = 
sort.getMSortFunc().getFuncSpec().toString();
+            ctorArgs = new String[ascs.length + 1];
+            ctorArgs[0] = USER_COMPARATOR_MARKER + userComparatorFuncSpec;
+            for(int j = 0; j < ascs.length; j++) {
+                ctorArgs[j+1] = ascs[j];
+            }
+        }
+        
+        POUserFunc uf = new POUserFunc(new 
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, 
+            new FuncSpec(FindQuantiles.class.getName(), ctorArgs));
+        ep4.add(uf);
+        ep4.connect(prjStar4, uf);
+        
+        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
+        ep4s.add(ep4);
+        List<Boolean> flattened3 = new ArrayList<Boolean>();
+        flattened3.add(false);
+        POForEach nfe3 = new POForEach(new 
OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
+        
+        mro.reducePlan.add(nfe3);
+        mro.reducePlan.connect(nfe2, nfe3);
+        
         POStore str = getStore();
         str.setSFile(quantFile);
         mro.reducePlan.add(str);
-        mro.reducePlan.connect(nfe2, str);
+        mro.reducePlan.connect(nfe3, str);
         
         mro.setReduceDone(true);
         mro.requestedParallelism = 1;

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 Tue Apr 14 19:41:10 2009
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -29,12 +30,14 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullableBytesWritable;
@@ -68,6 +71,7 @@
         return gen.getNext();
     }
 
+    @SuppressWarnings("unchecked")
     public void configure(JobConf job) {
         this.job = job;
         String quantilesFile = job.get("pig.quantilesFile", "");
@@ -78,120 +82,41 @@
         try{
             InputStream is = 
FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
             BinStorage loader = new BinStorage();
-            ArrayList<PigNullableWritable> quantilesList = new 
ArrayList<PigNullableWritable>();
+            DataBag quantilesList;
             loader.bindTo(quantilesFile, new 
BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
             Tuple t = loader.getNext();
             if(t==null) throw new RuntimeException("Empty samples file");
             // the Quantiles file has a tuple as under:
             // (numQuantiles, bag of samples) 
             // numQuantiles here is the reduce parallelism
-            numQuantiles = (Integer) t.get(0);
-            samples = (DataBag) t.get(1);
-            long numSamples = samples.size();
-            long toSkip = numSamples / numQuantiles;
-            if(toSkip == 0) {
-                // numSamples is < numQuantiles;
-                // set numQuantiles to numSamples
-                numQuantiles = (int)numSamples;
-                toSkip = 1;
-            }
-            
-            long ind=0, j=-1, nextQuantile = toSkip-1;
-            for (Tuple it : samples) {
-                if (ind==nextQuantile){
-                    ++j;
-                    quantilesList.add(getPigNullableWritable(it));
-                    nextQuantile+=toSkip;
-                    if(j==numQuantiles-1)
-                        break;
-                }
-                ind++;
-                //TODO how do we report progress?
-                //if (i % 1000 == 0) progress();
-                // Currently there is no way to report progress since 
-                // in configure() we cannot get a handle to the reporter
-                // (even PhysicalOperator.getReporter() does not work! It is
-                // set to null.) Hopefully the work done here wll be < 10 
minutes
-                // since we are dealing with 100* num_mapper samples. When
-                // RandomSampleLoader becomes an operator or UDF instead of a
-                // loader hopefully we can intelligently decide on the number
-                // of samples (instead of the static 100) and then not being
-                // able to report progress may not be a big issue.
-            }
+            Map<String, Object> quantileMap = (Map<String, Object>) t.get(0);
+            quantilesList = (DataBag) 
quantileMap.get(FindQuantiles.QUANTILES_LIST);
+            Map<Tuple, Tuple> weightedPartsData = (Map<Tuple, Tuple>) 
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             convertToArray(quantilesList);
-            long i=-1;
-            Map<PigNullableWritable,CountingMap<Integer>> contribs = new 
HashMap<PigNullableWritable, CountingMap<Integer>>();
-            for (Tuple it : samples){
-                ++i;
-                PigNullableWritable sample = getPigNullableWritable(it);
-                int partInd = new Long(i/toSkip).intValue(); // which partition
-                if(partInd==numQuantiles) break;
-                // the quantiles array has the element from the sample which 
is the
-                // last element for a given partition. For example: if 
numQunatiles 
-                // is 5 and number of samples is 100, then toSkip = 20 
-                // quantiles[0] = sample[19] // the 20th element
-                // quantiles[1] = sample[39] // the 40th element
-                // and so on. For any element in the sample between 0 and 19, 
partInd
-                // will be 0. We want to check if a sample element which is
-                // present between 0 and 19 and is also the 19th (quantiles[0] 
element).
-                // This would mean that element might spread over the 0th and 
1st 
-                // partition. We are looking for contributions to a partition
-                // from such elements. 
-                
-                // First We only check for sample elements in partitions other 
than the last one
-                // < numQunatiles -1 (partInd is 0 indexed). 
-                if(partInd<numQuantiles-1 && 
areEqual(sample,quantiles[partInd])){
-                    if(!contribs.containsKey(sample)){
-                        CountingMap<Integer> cm = new CountingMap<Integer>();
-                        cm.put(partInd, 1);
-                        contribs.put(sample, cm);
-                    }
-                    else
-                        contribs.get(sample).put(partInd, 1);
-                }
-                else{ 
-                    // we are either in the last partition (last quantile)
-                    // OR the sample element we are currently processing is not
-                    // the same as the element in the quantile array for this 
partition
-                    // if we haven't seen this sample item earlier, this is 
not an
-                    // element which crosses partitions - so ignore
-                    if(!contribs.containsKey(sample))
-                        continue;
-                    else
-                        // we have seen this sample before (in a previous 
partInd), 
-                        // add to the contribution associated with this sample 
- if we had 
-                        // not seen this sample in a previous partInd, then we 
have not
-                        // had this in the contribs map! (because of the if 
above).This 
-                        // sample can either go to the previous partInd or 
this partInd 
-                        // in the final sort reduce stage. That is where the 
amount of 
-                        // contribution to each partInd will matter and 
influence the choice.
-                        contribs.get(sample).put(partInd, 1);
-                }
-            }
-            for(Entry<PigNullableWritable, CountingMap<Integer>> ent : 
contribs.entrySet()){
-                PigNullableWritable key = ent.getKey(); // sample item which 
repeats
-                
-                // this map will have the contributions of the sample item to 
the different partitions
-                CountingMap<Integer> value = ent.getValue(); 
-                
-                long total = value.getTotalCount();
-                float[] probVec = new float[numQuantiles];
-                // for each partition that this sample item is present in,
-                // compute the fraction of the total occurences for that
-                // partition - this will be the probability with which we
-                // will pick this partition in the final sort reduce job
-                // for this sample item
-                for (Entry<Integer,Integer> valEnt : value.entrySet()) {
-                    probVec[valEnt.getKey()] = (float)valEnt.getValue()/total;
-                }
-//                weightedParts.put(key, new 
DiscreteProbabilitySampleGenerator(11317,probVec));
-                weightedParts.put(key, new 
DiscreteProbabilitySampleGenerator(probVec));
+            for(Entry<Tuple, Tuple> ent : weightedPartsData.entrySet()){
+                Tuple key = ent.getKey(); // sample item which repeats
+                float[] probVec = getProbVec(ent.getValue());
+                weightedParts.put(getPigNullableWritable(key), 
+                        new DiscreteProbabilitySampleGenerator(probVec));
             }
         }catch (Exception e){
             throw new RuntimeException(e);
         }
     }
 
+    /**
+     * @param value
+     * @return
+     * @throws ExecException 
+     */
+    private float[] getProbVec(Tuple values) throws ExecException {
+        float[] probVec = new float[values.size()];
+        for(int i = 0; i < values.size(); i++) {
+            probVec[i] = (Float)values.get(i);
+        }
+        return probVec;
+    }
+
     private PigNullableWritable getPigNullableWritable(Tuple t) {
         try {
             // user comparators work with tuples - so if user comparator
@@ -219,24 +144,38 @@
     }
 
     private void convertToArray(
-            ArrayList<PigNullableWritable> q) {
+            DataBag quantilesListAsBag) {
+        ArrayList<PigNullableWritable> quantilesList = 
getList(quantilesListAsBag);
         if ("true".equals(job.get("pig.usercomparator")) ||
-                q.get(0).getClass().equals(NullableTuple.class)) {
-            quantiles = q.toArray(new NullableTuple[0]);
-        } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) {
-            quantiles = q.toArray(new NullableBytesWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) {
-            quantiles = q.toArray(new NullableDoubleWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) {
-            quantiles = q.toArray(new NullableFloatWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableIntWritable.class)) {
-            quantiles = q.toArray(new NullableIntWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableLongWritable.class)) {
-            quantiles = q.toArray(new NullableLongWritable[0]);
-        } else if (q.get(0).getClass().equals(NullableText.class)) {
-            quantiles = q.toArray(new NullableText[0]);
+                quantilesList.get(0).getClass().equals(NullableTuple.class)) {
+            quantiles = quantilesList.toArray(new NullableTuple[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableBytesWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableBytesWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableDoubleWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableDoubleWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableFloatWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableFloatWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableIntWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableIntWritable[0]);
+        } else if 
(quantilesList.get(0).getClass().equals(NullableLongWritable.class)) {
+            quantiles = quantilesList.toArray(new NullableLongWritable[0]);
+        } else if (quantilesList.get(0).getClass().equals(NullableText.class)) 
{
+            quantiles = quantilesList.toArray(new NullableText[0]);
         } else {
             throw new RuntimeException("Unexpected class in " + 
this.getClass().getSimpleName());
         }
     }
+
+    /**
+     * @param quantilesListAsBag
+     * @return
+     */
+    private ArrayList<PigNullableWritable> getList(DataBag quantilesListAsBag) 
{
+        
+        ArrayList<PigNullableWritable> list = new 
ArrayList<PigNullableWritable>();
+        for (Tuple tuple : quantilesListAsBag) {
+            list.add(getPigNullableWritable(tuple));
+        }
+        return list;
+    }
 }

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java?rev=764931&r1=764930&r2=764931&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java 
(original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/data/DataType.java Tue 
Apr 14 19:41:10 2009
@@ -619,6 +619,64 @@
     }
 
     /**
+     * Force a data object to a String, if possible.  Any simple (atomic) type
+     * can be forced to a String including ByteArray.  Complex types cannot be
+     * forced to a String.  This isn't particularly efficient, so if you
+     * already <b>know</b> that the object you have is a String you
+     * should just cast it.
+     * @return The object as a String.
+     * @throws ExecException if the type can't be forced to a String.
+     */
+    public static String toString(Object o) throws ExecException {
+        try {
+                       switch (findType(o)) {
+                       case INTEGER:
+                           return ((Integer)o).toString();
+
+                       case LONG:
+                           return ((Long)o).toString();
+
+                       case FLOAT:
+                           return ((Float)o).toString();
+
+                       case DOUBLE:
+                           return ((Double)o).toString();
+
+                       case BYTEARRAY:
+                           return ((DataByteArray)o).toString();
+
+                       case CHARARRAY:
+                           return ((String)o);
+
+                       case NULL:
+                           return null;
+
+                       case BOOLEAN:
+                           return ((Boolean)o).toString();
+
+                       case BYTE:
+                           return ((Byte)o).toString();
+
+                       case MAP:
+                       case TUPLE:
+                       case BAG:
+                       case UNKNOWN:
+                       default:
+                           int errCode = 1071;
+                           String msg = "Cannot convert a " + findTypeName(o) +
+                           " to a String";
+                           throw new ExecException(msg, errCode, 
PigException.INPUT);
+                       }
+               } catch (ExecException ee) {
+                       throw ee;
+               } catch (Exception e) {
+                       int errCode = 2054;
+                       String msg = "Internal error. Could not convert " + o + 
" to String.";
+                       throw new ExecException(msg, errCode, PigException.BUG);
+               }
+    }
+
+    /**
      * If this object is a map, return it as a map.
      * This isn't particularly efficient, so if you
      * already <b>know</b> that the object you have is a Map you

Propchange: 
hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 14 19:41:10 2009
@@ -1 +1 @@
-/hadoop/pig/trunk/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt:758070-763050
+/hadoop/pig/trunk/test/org/apache/pig/test/utils/dotGraph/DOTParser.jjt:758070-764920


Reply via email to