Author: dvryaboy
Date: Tue Aug 24 21:11:16 2010
New Revision: 988730

URL: http://svn.apache.org/viewvc?rev=988730&view=rev
Log:
PIG-1551 Improve dynamic invokers to deal with no-arg methods and array 
parameters

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Aug 24 21:11:16 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1551: Improve dynamic invokers to deal with no-arg methods and array 
parameters (dvryaboy)
+
 PIG-1311:  Document audience and stability for remaining interfaces (gates)
 
 PIG-506: Does pig need a NATIVE keyword? (aniket486 via thejas)

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java Tue Aug 24 
21:11:16 2010
@@ -32,9 +32,18 @@ import org.apache.pig.impl.logicalLayer.
  * Class-specific non-generic extensions of this class are needed for Pig to 
know what type
  * of return to expect from exec, and to find the appropriate classes through 
reflection.
  * All they have to do is implement the constructors that call into super(). 
Note that the 
- * no-parameter constructor is <b>required</b>, if nonsensical, for Pig to do 
its work.
+ * no-parameter constructor is <b>required</b>, if seemingly nonsensical, for 
Pig to do its work.
+ * <p>
+ * The Invoker family of udfs understand the following class names (all 
case-independent):
+ * <li>String
+ * <li>Long
+ * <li>Float
+ * <li>Double
+ * <li>Int
+ * <p>
+ * Invokers can also work with array arguments, represented in Pig as DataBags 
of single-tuple
+ * elements. Simply refer to <code>string[]</code>, for example.
  * <p>
- * 
  * This UDF allows one to dynamically invoke Java methods that return a 
<code>T</code>
  * <p>
  * Usage of the Invoker family of UDFs (adjust as appropriate):
@@ -54,6 +63,7 @@ import org.apache.pig.impl.logicalLayer.
  * The first argument to the constructor is the full path to desired 
method.<br>
  * The second argument is a list of classes of the method parameters.<br>
  * If the method is not static, the first element in this list is the object 
to invoke the method on.<br>
+ * The second argument is optional (a no-argument static method is assumed if 
it is not supplied).<br>
  * The third argument is the keyword "static" (or "true") to signify that the 
method is static. <br>
  * The third argument is optional, and true by default.<br>
  * <p>
@@ -65,6 +75,11 @@ public abstract class GenericInvoker<T> 
 
     public GenericInvoker() {}
 
+    public GenericInvoker(String fullName) 
+    throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
+      invoker_ = new Invoker<T>(fullName, "");
+    }
+
     public GenericInvoker(String fullName, String paramSpecsStr)
     throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
         invoker_ = new Invoker<T>(fullName, paramSpecsStr);

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java Tue Aug 24 
21:11:16 2010
@@ -26,6 +26,10 @@ public class InvokeForDouble extends Gen
 
      public InvokeForDouble() {}
 
+     public InvokeForDouble(String fullName) throws FrontendException, 
SecurityException, ClassNotFoundException, NoSuchMethodException {
+       super(fullName);
+     }
+     
      public InvokeForDouble(String fullName, String paramSpecsStr) throws 
FrontendException, SecurityException, ClassNotFoundException, 
NoSuchMethodException {
          super(fullName, paramSpecsStr);
      }
@@ -34,4 +38,6 @@ public class InvokeForDouble extends Gen
      throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
          super(fullName, paramSpecsStr, isStatic);
      }
+
+
  }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java Tue Aug 24 
21:11:16 2010
@@ -27,6 +27,10 @@ public class InvokeForFloat extends Gene
 
      public InvokeForFloat() {}
 
+     public InvokeForFloat(String fullName) throws FrontendException, 
SecurityException, ClassNotFoundException, NoSuchMethodException {
+       super(fullName);
+     }
+     
      public InvokeForFloat(String fullName, String paramSpecsStr) throws 
FrontendException, SecurityException, ClassNotFoundException, 
NoSuchMethodException {
          super(fullName, paramSpecsStr);
      }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java Tue Aug 24 
21:11:16 2010
@@ -30,6 +30,10 @@ public class InvokeForInt extends Generi
         super(fullName, paramSpecsStr);
     }
 
+    public InvokeForInt(String fullName) throws FrontendException, 
SecurityException, ClassNotFoundException, NoSuchMethodException {
+      super(fullName);
+    }
+    
     public InvokeForInt(String fullName, String paramSpecsStr, String isStatic)
     throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
         super(fullName, paramSpecsStr, isStatic);

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java Tue Aug 24 
21:11:16 2010
@@ -26,6 +26,10 @@ public class InvokeForLong extends Gener
 
     public InvokeForLong() {}
 
+    public InvokeForLong(String fullName) throws FrontendException, 
SecurityException, ClassNotFoundException, NoSuchMethodException {
+      super(fullName);
+    }
+    
     public InvokeForLong(String fullName, String paramSpecsStr) throws 
FrontendException, SecurityException, ClassNotFoundException, 
NoSuchMethodException {
         super(fullName, paramSpecsStr);
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java Tue Aug 24 
21:11:16 2010
@@ -31,6 +31,10 @@ public class InvokeForString extends Gen
         super(fullName, paramSpecsStr);
     }
 
+    public InvokeForString(String fullName) throws FrontendException, 
SecurityException, ClassNotFoundException, NoSuchMethodException {
+      super(fullName);
+    }
+    
     public InvokeForString(String fullName, String paramSpecsStr, String 
isStatic)
     throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
         super(fullName, paramSpecsStr, isStatic);

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java Tue Aug 24 
21:11:16 2010
@@ -23,13 +23,34 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 public class Invoker<T>  {
 
+    private static final Log LOG = LogFactory.getLog(Invoker.class);
+
+    private static final Class<?> DOUBLE_ARRAY_CLASS = new 
double[0].getClass();
+    private static final Class<?> INT_ARRAY_CLASS = new int[0].getClass();
+    private static final Class<?> FLOAT_ARRAY_CLASS = new float[0].getClass();
+    private static final Class<?> STRING_ARRAY_CLASS = new 
String[0].getClass();
+    private static final Class<?> LONG_ARRAY_CLASS = new long[0].getClass();
+
+    @SuppressWarnings("unchecked")
+    private static final Set<Class<?>> ARRAY_CLASSES = Sets.newHashSet(
+        DOUBLE_ARRAY_CLASS, INT_ARRAY_CLASS, FLOAT_ARRAY_CLASS, 
STRING_ARRAY_CLASS,
+        LONG_ARRAY_CLASS);
+
     private Method method_;
     private Class<?>[] paramClasses_;
 
@@ -37,15 +58,17 @@ public class Invoker<T>  {
     private Class<?> selfClass_;
     private Type returnType_;
 
-    public Invoker(String fullName, String paramSpecsStr) throws 
ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
+    public Invoker(String fullName, String paramSpecsStr) 
+    throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
         this(fullName, paramSpecsStr, "true");
     }
 
-    public Invoker(String fullName, String paramSpecsStr, String isStatic) 
throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
+    public Invoker(String fullName, String paramSpecsStr, String isStatic) 
+    throws ClassNotFoundException, FrontendException, SecurityException, 
NoSuchMethodException {
         String className = fullName.substring(0, fullName.lastIndexOf('.'));
         String methodName = fullName.substring(fullName.lastIndexOf('.')+1);
         Class<?> klazz = Class.forName(className);
-        String[] paramSpecs = paramSpecsStr.split(" ");
+        String[] paramSpecs = "".equals(paramSpecsStr) ? new String[0] : 
paramSpecsStr.split(" ");
         isStatic_ = "static".equalsIgnoreCase(isStatic) || 
"true".equals(isStatic);
         paramClasses_ = new Class<?>[paramSpecs.length];
         for (int i = 0; i < paramSpecs.length; i++) {
@@ -58,8 +81,9 @@ public class Invoker<T>  {
         returnType_ = method_.getGenericReturnType();
     }
 
+    @SuppressWarnings("rawtypes")
     public Type getReturnType() {
-        return returnType_;
+        return unPrimitivize((Class) returnType_);
     }
 
     private static Class<?>[] dropFirstClass(Class<?>[] original) {
@@ -89,8 +113,18 @@ public class Invoker<T>  {
             return Float.TYPE;
         } else if ("long".equalsIgnoreCase(klass)) {
             return Long.TYPE;
+        } else if ("double[]".equalsIgnoreCase(klass)) {
+          return DOUBLE_ARRAY_CLASS;
+        } else if ("int[]".equalsIgnoreCase(klass)) {
+          return INT_ARRAY_CLASS;
+        } else if ("long[]".equalsIgnoreCase(klass)) {
+          return LONG_ARRAY_CLASS;
+        } else if ("float[]".equalsIgnoreCase(klass)) {
+          return FLOAT_ARRAY_CLASS;
+        } else if ("string[]".equalsIgnoreCase(klass)) {
+          return STRING_ARRAY_CLASS;
         } else { 
-            throw new FrontendException("unable to find mathing class for " + 
klass);
+            throw new FrontendException("unable to find matching class for " + 
klass);
         }
 
     }
@@ -105,20 +139,80 @@ public class Invoker<T>  {
         } else if (klass.equals(Double.TYPE)) {
             return Double.class;
         } else {
-            return klass;
+          return klass;
+        }
+    }
+
+    private static <T> T convertToExpectedArg(Class<T> klass, Object obj) 
throws ExecException {
+      if (ARRAY_CLASSES.contains(klass)) {
+        DataBag dbag = (DataBag) obj;
+        if (STRING_ARRAY_CLASS.equals(klass)) {
+          List<String> dataList = Lists.newArrayList();
+          for (Tuple t : dbag) {
+            dataList.add( (String) t.get(0));
+          }
+          String[] dataArray = new String[dataList.size()];
+          for (int i = 0; i < dataList.size(); i++) {
+            dataArray[i] = dataList.get(i);
+          }
+          obj = dataArray;
+        } else {
+          List<Number> dataList = bagToNumberList(dbag);
+          if (DOUBLE_ARRAY_CLASS.equals(klass)) {
+            double[] dataArray = new double[dataList.size()];
+            for (int i = 0; i < dataList.size(); i++) {
+              dataArray[i] = dataList.get(i).doubleValue();
+            }
+            obj = dataArray;
+          } else if (INT_ARRAY_CLASS.equals(klass)) {
+            int[] dataArray = new int[dataList.size()];
+            for (int i = 0; i < dataList.size(); i++) {
+              dataArray[i] = dataList.get(i).intValue();
+            }
+            obj = dataArray;
+          } else if (FLOAT_ARRAY_CLASS.equals(klass)) {
+            float[] dataArray = new float[dataList.size()];
+            for (int i = 0; i < dataList.size(); i++) {
+              dataArray[i] = dataList.get(i).floatValue();
+            }
+            obj = dataArray;
+          } else if (LONG_ARRAY_CLASS.equals(klass)) {
+            long[] dataArray = new long[dataList.size()];
+            for (int i = 0; i < dataList.size(); i++) {
+              dataArray[i] = dataList.get(i).longValue();
+            }
+            obj = dataArray;
+          }
         }
+      }
+      try {
+        return klass.cast(obj);
+      } catch (ClassCastException e) {
+        LOG.error("Error in dynamic argument processing. Casting to: "
+            + klass + " from: " + obj.getClass(), e);
+        throw new ExecException(e);
+      }
+    }
+
+    private static List<Number> bagToNumberList(DataBag dbag) throws 
ExecException {
+      List<Number> dataList = Lists.newArrayList();
+      for (Tuple t : dbag) {
+        dataList.add( (Number) t.get(0));
+      }
+      return dataList;
     }
 
     private Object[] tupleToArgs(Tuple t) throws ExecException {
-        if ( (t == null && paramClasses_ != null) || (t != null && t.size() != 
paramClasses_.length)) {
+      if ( (t == null && (paramClasses_ != null || paramClasses_.length != 0)) 
+            || (t != null && t.size() < paramClasses_.length)) {
             throw new ExecException("unable to match function arguments to 
declared signature.");
         }
         if (t == null) {
             return null;
         }
-        Object[] args = new Object[t.size()];
-        for (int i = 0; i < t.size(); i++) {
-            args[i] =  unPrimitivize(paramClasses_[i]).cast(t.get(i));
+        Object[] args = new Object[paramClasses_.length];
+        for (int i = 0; i < paramClasses_.length; i++) {
+            args[i] =  convertToExpectedArg(unPrimitivize(paramClasses_[i]), 
t.get(i));
         }
         return args;
     }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java Tue Aug 24 
21:11:16 2010
@@ -29,6 +29,8 @@ import org.apache.pig.builtin.InvokeForF
 import org.apache.pig.builtin.InvokeForInt;
 import org.apache.pig.builtin.InvokeForLong;
 import org.apache.pig.builtin.InvokeForString;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.junit.Test;
@@ -40,6 +42,8 @@ import org.junit.Test;
 public class TestInvoker {
 
     private final TupleFactory tf_ = TupleFactory.getInstance();
+    private final BagFactory bf_ = BagFactory.getInstance();
+
     @Test
     public void testStringInvoker() throws SecurityException, 
ClassNotFoundException, NoSuchMethodException, IOException {
 
@@ -67,6 +71,11 @@ public class TestInvoker {
     }
 
     @Test
+    public void testNoArgInvoker() throws SecurityException, 
ClassNotFoundException, NoSuchMethodException, IOException {
+      InvokeForInt id = new InvokeForInt(TestInvoker.class.getName() + 
".simpleStaticFunction");
+      assertEquals(Integer.valueOf(1), id.exec(tf_.newTuple()));
+    }
+    @Test
     public void testLongInvoker() throws SecurityException, 
ClassNotFoundException, NoSuchMethodException, NumberFormatException, 
IOException {
         InvokeForLong il = new InvokeForLong("java.lang.Long.valueOf", 
"String");
         Tuple t = tf_.newTuple(1);
@@ -85,6 +94,21 @@ public class TestInvoker {
     }
 
     @Test
+    public void testArrayConversion() throws SecurityException, 
ClassNotFoundException, NoSuchMethodException, IOException {
+      InvokeForInt ii = new InvokeForInt(TestInvoker.class.getName() + ".avg", 
"double[]");
+      DataBag nums = newSimpleBag(1.0, 2.0, 3.0);
+      assertEquals(Integer.valueOf(2), ii.exec(tf_.newTuple(nums)));
+      
+      ii = new InvokeForInt(TestInvoker.class.getName() + ".avg", "long[]");
+      nums = newSimpleBag(1L, 2L, 3L);
+      assertEquals(Integer.valueOf(2), ii.exec(tf_.newTuple(nums)));
+
+      InvokeForString is = new InvokeForString(TestInvoker.class.getName() + 
".concatStringArray", "string[]");
+      DataBag strings = newSimpleBag("foo", "bar", "baz");
+      assertEquals("foobarbaz", is.exec(tf_.newTuple(strings)));
+    }
+
+    @Test
     public void testDoubleInvoker() throws SecurityException, 
ClassNotFoundException, NoSuchMethodException, NumberFormatException, 
IOException {
         InvokeForDouble il = new InvokeForDouble("java.lang.Double.valueOf", 
"String");
         Tuple t = tf_.newTuple(1);
@@ -106,6 +130,42 @@ public class TestInvoker {
         return str1.concat(str2);
     }
 
+    public static String concatStringArray(String[] strings) {
+      StringBuilder sb = new StringBuilder();
+      for (String s : strings) {
+        sb.append(s);
+      }
+      return sb.toString();
+    }
+
+    public static int simpleStaticFunction() {
+      return 1;
+    }
+    
+    public static int avg(long[] nums) {
+        long sum = 0;
+        for (long d: nums) {
+          sum += d;
+        }
+        return (int) sum/nums.length;
+      }
+
+    public static int avg(double[] nums) {
+      double sum = 0;
+      for (double d: nums) {
+        sum += d;
+      }
+      return (int) sum/nums.length;
+    }
+
+    private DataBag newSimpleBag(Object... objects) {
+      DataBag bag = bf_.newDefaultBag();
+      for (Object o : objects) {
+        bag.add(tf_.newTuple(o));
+      }
+      return bag;
+    }
+
     @Test
     public void testSpeed() throws IOException, SecurityException, 
ClassNotFoundException, NoSuchMethodException {
         EvalFunc<Double> log = new Log();


Reply via email to