Author: gates
Date: Thu Feb 11 19:34:21 2010
New Revision: 909116

URL: http://svn.apache.org/viewvc?rev=909116&view=rev
Log:
PIG-1217: Fix argToFuncMapping in Piggybank Top function.

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
    hadoop/pig/trunk/src/org/apache/pig/data/DataType.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Feb 11 19:34:21 2010
@@ -93,6 +93,8 @@
 
 BUG FIXES
 
+PIG-1217: Fix argToFuncMapping in Piggybank Top function (dvryaboy via gates)
+
 PIG-1154: Local Mode fails when hadoop config directory is specified in 
             classpath (ankit.modi via gates)
 

Modified: 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
 (original)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
 Thu Feb 11 19:34:21 2010
@@ -23,6 +23,11 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -30,11 +35,13 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 /**
- * TopN UDF accepts a bag of tuples and returns top-n tuples depending upon the
+ * Top UDF accepts a bag of tuples and returns top-n tuples depending upon the
  * tuple field value of type long. Both n and field number needs to be provided
  * to the UDF. The UDF iterates through the input bag and just retains top-n
  * tuples by storing them in a priority queue of size n+1 where priority is the
@@ -43,6 +50,8 @@
  * UDF is especially helpful for turning the nested grouping operation inside
  * out and retaining top-n in a nested group. 
  * 
+ * Assumes all tuples in the bag contain an element of the same type in the 
compared column.
+ * 
  * Sample usage: 
  * A = LOAD 'test.tsv' as (first: chararray, second: chararray); 
  * B = GROUP A BY (first, second);
@@ -53,95 +62,288 @@
  *          GENERATE FLATTEN(result); 
  *  }
  */
-public class Top extends EvalFunc<DataBag> {
+public class Top extends EvalFunc<DataBag> implements Algebraic{
+    private static final Log log = LogFactory.getLog(Top.class);
+    static BagFactory mBagFactory = BagFactory.getInstance();
+    static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    private Random randomizer = new Random();
+
+    static class TupleComparator implements Comparator<Tuple> {
+        private final int fieldNum;
+        private byte datatype;
+        private boolean typeFound=false;
+
+        public TupleComparator(int fieldNum) {
+            this.fieldNum = fieldNum;
+        }
+
+        /*          
+         * (non-Javadoc)
+         * @see java.util.Comparator#compare(java.lang.Object, 
java.lang.Object)
+         */
+        @Override
+        public int compare(Tuple o1, Tuple o2) {
+            if (o1 == null)
+                return -1;
+            if (o2 == null)
+                return 1;
+            try {
+                Object field1 = o1.get(fieldNum);
+                Object field2 = o2.get(fieldNum);
+                if (!typeFound) {
+                    datatype = DataType.findType(field1);
+                    typeFound = true;
+                }
+                return DataType.compare(field1, field2, datatype, datatype);
+            } catch (ExecException e) {
+                throw new RuntimeException("Error while comparing o1:" + o1
+                        + " and o2:" + o2, e);
+            }
+        }
+    }
+
+    @Override
+    public DataBag exec(Tuple tuple) throws IOException {
+        if (tuple == null || tuple.size() < 3) {
+            return null;
+        }
+        try {
+            int n = (Integer) tuple.get(0);
+            int fieldNum = (Integer) tuple.get(1);
+            DataBag inputBag = (DataBag) tuple.get(2);
+            PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+                    new TupleComparator(fieldNum));
+            updateTop(store, n, inputBag);
+            DataBag outputBag = mBagFactory.newDefaultBag();
+            for (Tuple t : store) {
+                outputBag.add(t);
+            }
+            if (log.isDebugEnabled()) {
+                if (randomizer.nextInt(1000) == 1) {
+                    log.debug("outputting a bag: ");
+                    for (Tuple t : outputBag) 
+                        log.debug("outputting "+t.toDelimitedString("\t"));
+                    log.debug("==================");
+                }
+            }
+            return outputBag;
+        } catch (ExecException e) {
+            throw new RuntimeException("ExecException executing function: ", 
e);
+        } catch (Exception e) {
+            throw new RuntimeException("General Exception executing function: 
" + e);
+        }
+    }
+
+    protected static void updateTop(PriorityQueue<Tuple> store, int limit, 
DataBag inputBag) {
+        Iterator<Tuple> itr = inputBag.iterator();
+        while (itr.hasNext()) {
+            Tuple t = itr.next();
+            store.add(t);
+            if (store.size() > limit)
+                store.poll();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+     */
+    @Override
+    public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>(3);
+        fields.add(new Schema.FieldSchema(null, DataType.INTEGER));
+        fields.add(new Schema.FieldSchema(null, DataType.INTEGER));
+        fields.add(new Schema.FieldSchema(null, DataType.BAG));
+        FuncSpec funcSpec = new FuncSpec(this.getClass().getName(), new 
Schema(fields));
+        List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(1);
+        funcSpecs.add(funcSpec);
+        return funcSpecs;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        try {
+            if (input.size() < 3) {
+                return null;
+            }
+            Schema.FieldSchema bagFs = new Schema.FieldSchema(null,
+                    input.getField(2).schema, DataType.BAG);
+            return new Schema(bagFs);
+
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    @Override
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    @Override
+    public String getIntermed() {
+        return Intermed.class.getName();
+    }
+
+    @Override
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    /*
+     * Same as normal code-path exec, but outputs a Tuple with the schema
+     * <Int, Int, DataBag> -- same schema as expected input.
+     */
+    static public class Initial extends EvalFunc<Tuple> {
+        //private static final Log log = LogFactory.getLog(Initial.class);
+        //private final Random randomizer = new Random();
+        @Override
+        public Tuple exec(Tuple tuple) throws IOException {
+            if (tuple == null || tuple.size() < 3) {
+                return null;
+            }
+            
+            try {
+                int n = (Integer) tuple.get(0);
+                int fieldNum = (Integer) tuple.get(1);
+                DataBag inputBag = (DataBag) tuple.get(2);
+                Tuple retTuple = mTupleFactory.newTuple(3);
+                DataBag outputBag = mBagFactory.newDefaultBag();
+                // initially, there should only be one, so not much point in 
doing the priority queue
+                for (Tuple t : inputBag) {
+                    outputBag.add(t);
+                }
+                retTuple.set(0, n);
+                retTuple.set(1,fieldNum);
+                retTuple.set(2, outputBag);               
+                return retTuple;
+            } catch (Exception e) {
+                throw new RuntimeException("General Exception executing 
function: " + e);
+            }
+        }
+    }
+
+    static public class Intermed extends EvalFunc<Tuple> {
+        private static final Log log = LogFactory.getLog(Intermed.class);
+        private final Random randomizer = new Random();
+        /* The input is a tuple that contains a single bag.
+         * This bag contains outputs of the Initial step --
+         * tuples of the format (limit, index, { top_tuples })
+         * 
+         * We need to take the top of tops and return a similar tuple.
+         * 
+         * (non-Javadoc)
+         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            if (input == null || input.size() < 1) {
+                return null;
+            }
+            try {
+                DataBag bagOfIntermediates = (DataBag) input.get(0);
+                Iterator<Tuple> intermediateIterator = 
bagOfIntermediates.iterator();
+                if (!intermediateIterator.hasNext()) {
+                    return null;
+                }
+                Tuple peekTuple = intermediateIterator.next();
+                if (peekTuple == null || peekTuple.size() < 3 ) return null;
+                int n = (Integer) peekTuple.get(0);
+                int fieldNum = (Integer) peekTuple.get(1);
+                DataBag inputBag = (DataBag) peekTuple.get(2);
 
-  BagFactory mBagFactory = BagFactory.getInstance();
+                PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+                        new TupleComparator(fieldNum));
 
-  static class TupleComparator implements Comparator<Tuple> {
-    private int fieldNum;
+                updateTop(store, n, inputBag);
 
-    public TupleComparator(int fieldNum) {
-      this.fieldNum = fieldNum;
-    }
-
-    @Override
-    public int compare(Tuple o1, Tuple o2) {
-      if (o1 == null)
-        return -1;
-      if (o2 == null)
-        return 1;
-      int retValue = 1;
-      try {
-        long count1 = (Long) o1.get(fieldNum);
-        long count2 = (Long) o2.get(fieldNum);
-        retValue = (count1 > count2) ? 1 : ((count1 == count2) ? 0 : -1);
-      } catch (ExecException e) {
-        throw new RuntimeException("Error while comparing o1:" + o1
-            + " and o2:" + o2, e);
-      }
-      return retValue;
-    }
-  }
-
-  @Override
-  public DataBag exec(Tuple tuple) throws IOException {
-    if (tuple == null || tuple.size() < 3) {
-      return null;
-    }
-    try {
-      int n = (Integer) tuple.get(0);
-      int fieldNum = (Integer) tuple.get(1);
-      DataBag inputBag = (DataBag) tuple.get(2);
-      PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
-          new TupleComparator(fieldNum));
-      Iterator<Tuple> itr = inputBag.iterator();
-      while (itr.hasNext()) {
-        Tuple t = itr.next();
-        store.add(t);
-        if (store.size() > n)
-          store.poll();
-      }
-      DataBag outputBag = mBagFactory.newDefaultBag();
-      for (Tuple t : store) {
-        outputBag.add(t);
-      }
-      return outputBag;
-    } catch (ExecException e) {
-      throw new RuntimeException("ExecException executing function: ", e);
-    } catch (Exception e) {
-      throw new RuntimeException("General Exception executing function: " + e);
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
-   */
-  @Override
-  public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
-    List<FuncSpec> funcList = new ArrayList<FuncSpec>();
-    funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
-        new Schema.FieldSchema(null, DataType.INTEGER))));
-    funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
-        new Schema.FieldSchema(null, DataType.INTEGER))));
-    funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
-        new Schema.FieldSchema(null, DataType.BAG))));
-    return funcList;
-  }
-
-  @Override
-  public Schema outputSchema(Schema input) {
-    try {
-      if (input.size() < 3) {
-        return null;
-      }
-      Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_input_tuples",
-          input.getField(2).schema, DataType.BAG);
-      return new Schema(bagFs);
+                while (intermediateIterator.hasNext()) {
+                    Tuple t = intermediateIterator.next();
+                    if (t == null || t.size() < 3 ) continue;
+                    updateTop(store, n, (DataBag) t.get(2));
+                }   
 
-    } catch (Exception e) {
-      return null;
+                DataBag outputBag = mBagFactory.newDefaultBag();
+                for (Tuple t : store) {
+                    outputBag.add(t);
+                }
+                Tuple retTuple = mTupleFactory.newTuple(3);
+                retTuple.set(0, n);
+                retTuple.set(1,fieldNum);
+                retTuple.set(2, outputBag);
+                if (log.isDebugEnabled()) { 
+                    if (randomizer.nextInt(1000) == 1) log.debug("outputting 
"+retTuple.toDelimitedString("\t")); 
+                }
+                return retTuple;
+            } catch (ExecException e) {
+                throw new RuntimeException("ExecException executing function: 
", e);
+            } catch (Exception e) {
+                throw new RuntimeException("General Exception executing 
function: " + e);
+            }
+        }
+        
+    }
+    
+    static public class Final extends EvalFunc<DataBag> {
+
+        private static final Log log = LogFactory.getLog(Final.class);
+        private final Random randomizer = new Random();
+
+
+
+        /*
+         * The input to this function is a tuple that contains a single bag.
+         * This bag, in turn, contains outputs of the Intermediate step -- 
+         * tuples of the format (limit, index, { top_tuples } )
+         * 
+         * we want to return a bag of top tuples
+         * 
+         * (non-Javadoc)
+         * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public DataBag exec(Tuple tuple) throws IOException {
+            if (tuple == null || tuple.size() < 1) {
+                return null;
+            }
+            try {
+                DataBag bagOfIntermediates = (DataBag) tuple.get(0);
+                Iterator<Tuple> intermediateIterator = 
bagOfIntermediates.iterator();
+                if (!intermediateIterator.hasNext()) {
+                    return null;
+                }
+                Tuple peekTuple = intermediateIterator.next();
+                if (peekTuple == null || peekTuple.size() < 3 ) return null;
+                int n = (Integer) peekTuple.get(0);
+                int fieldNum = (Integer) peekTuple.get(1);
+                DataBag inputBag = (DataBag) peekTuple.get(2);
+
+                PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+                        new TupleComparator(fieldNum));
+
+                updateTop(store, n, inputBag);
+
+                while (intermediateIterator.hasNext()) {
+                    Tuple t = intermediateIterator.next();
+                    if (t == null || t.size() < 3 ) continue;
+                    updateTop(store, n, (DataBag) t.get(2));
+                }   
+
+                DataBag outputBag = mBagFactory.newDefaultBag();
+                for (Tuple t : store) {
+                    outputBag.add(t);
+                }
+                if (log.isDebugEnabled()) {
+                    if (randomizer.nextInt(1000) == 1) for (Tuple t : 
outputBag) log.debug("outputting "+t.toDelimitedString("\t"));
+                }
+                return outputBag;
+            } catch (ExecException e) {
+                throw new RuntimeException("ExecException executing function: 
", e);
+            } catch (Exception e) {
+                throw new RuntimeException("General Exception executing 
function: " + e);
+            }
+        }
     }
-  }
 }
+

Modified: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
 (original)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
 Thu Feb 11 19:34:21 2010
@@ -17,46 +17,82 @@
  */
 package org.apache.pig.piggybank.test.evaluation.util;
 
-import java.util.Iterator;
+import java.io.IOException;
 
+import junit.framework.TestCase;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.piggybank.evaluation.util.Top;
 import org.junit.Test;
 
-import junit.framework.TestCase;
-
 public class TestTop extends TestCase {
 
-  @Test
-  public void testTop() throws Exception {
-    Top top = new Top();
-    Tuple inputTuple = DefaultTupleFactory.getInstance().newTuple(3);
-    // set N = 10 i.e retain top 10 tuples
-    inputTuple.set(0, 10);
-    // compare tuples by field number 1
-    inputTuple.set(1, 1);
-    // set the data bag containing the tuples
-    DataBag dBag = DefaultBagFactory.getInstance().newDefaultBag();
-    inputTuple.set(2, dBag);
-    // generate tuples of the form (group-1, 1), (group-2, 2) ...
-    for (long i = 0; i < 100; i++) {
-      Tuple nestedTuple = DefaultTupleFactory.getInstance().newTuple(2);
-      nestedTuple.set(0, "group-" + i);
-      nestedTuple.set(1, i);
-      dBag.add(nestedTuple);
+    Top top_ = new Top();
+    TupleFactory tupleFactory_ = DefaultTupleFactory.getInstance();
+    BagFactory bagFactory_ = DefaultBagFactory.getInstance();
+    Tuple inputTuple_ = tupleFactory_.newTuple(3);
+    DataBag dBag_ = bagFactory_.newDefaultBag();
+
+    public void setup() throws ExecException {
+        // set N = 10 i.e retain top 10 tuples
+        inputTuple_.set(0, 10);
+        // compare tuples by field number 1
+        inputTuple_.set(1, 1);
+        // set the data bag containing the tuples
+        inputTuple_.set(2, dBag_);
+
+        // generate tuples of the form (group-1, 1), (group-2, 2) ...
+        for (long i = 0; i < 100; i++) {
+            Tuple nestedTuple = tupleFactory_.newTuple(2);
+            nestedTuple.set(0, "group-" + i);
+            nestedTuple.set(1, i);
+            dBag_.add(nestedTuple);
+        }     
+    }
+    @Test
+    public void testTopExec() throws Exception {
+        setup();
+        DataBag outBag = top_.exec(inputTuple_);
+        assertEquals(outBag.size(), 10L);
+        checkItemsGT(outBag, 1, 89);
+    }
+
+    @Test
+    public void testTopAlgebraic() throws IOException {
+        setup();
+        // two initial results
+        Tuple init1 = (new Top.Initial()).exec(inputTuple_);
+        Tuple init2 = (new Top.Initial()).exec(inputTuple_);
+        // two intermediate results
+
+        DataBag intermedBag = bagFactory_.newDefaultBag();
+        intermedBag.add(init1);
+        intermedBag.add(init2);
+        Tuple intermedInput = tupleFactory_.newTuple(intermedBag);
+        Tuple intermedOutput1 = (new Top.Intermed()).exec(intermedInput);
+        Tuple intermedOutput2 = (new Top.Intermed()).exec(intermedInput);
+        checkItemsGT((DataBag)intermedOutput1.get(2), 1, 94);
+
+        // final result
+        DataBag finalInputBag = bagFactory_.newDefaultBag();
+        finalInputBag.add(intermedOutput1);
+        finalInputBag.add(intermedOutput2);
+        Tuple finalInput = tupleFactory_.newTuple(finalInputBag);
+        DataBag outBag = (new Top.Final()).exec(finalInput);
+        assertEquals(outBag.size(), 10L);
+        checkItemsGT(outBag, 1, 96);
     }
 
-    DataBag outBag = top.exec(inputTuple);
-    super.assertEquals(outBag.size(), 10L);
-    Iterator<Tuple> itr = outBag.iterator();
-    while (itr.hasNext()) {
-      Tuple next = itr.next();
-      Long value = (Long) next.get(1);
-      super.assertTrue("Value " + value + " exceeded the expected limit",
-          value > 89);
+    private void checkItemsGT(Iterable<Tuple> tuples, int field, int limit) 
throws ExecException {
+        for (Tuple t : tuples) {
+            Long val = (Long) t.get(field);
+            assertTrue("Value "+ val + " exceeded the expected limit", val > 
limit);
+        }
     }
-  }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataType.java?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataType.java Thu Feb 11 19:34:21 
2010
@@ -298,11 +298,22 @@
      * @param o2 Second object
      * @return -1 if o1 is less, 0 if they are equal, 1 if o2 is less.
      */
-    @SuppressWarnings("unchecked")
     public static int compare(Object o1, Object o2) {
+
         byte dt1 = findType(o1);
         byte dt2 = findType(o2);
+        return compare(o1, o2, dt1, dt2);
+    }
 
+    /*
+     * Same as compare(Object o1, Object o2), but does not use reflection to 
determine the type 
+     * of passed in objects, relying instead on the caller to provide the 
appropriate values, as
+     * determined by DataType.findType(Object o);
+     * 
+     * Use this version in cases where multiple objects of the same type have 
to be repeatedly compared.
+     */
+    @SuppressWarnings("unchecked")
+    public static int compare(Object o1, Object o2, byte dt1, byte dt2) {
         if (dt1 == dt2) {
             switch (dt1) {
             case NULL:


Reply via email to