Author: dvryaboy Date: Tue Aug 24 21:11:16 2010 New Revision: 988730 URL: http://svn.apache.org/viewvc?rev=988730&view=rev Log: PIG-1551 Improve dynamic invokers to deal with no-arg methods and array parameters
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Aug 24 21:11:16 2010 @@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1551: Improve dynamic invokers to deal with no-arg methods and array parameters (dvryaboy) + PIG-1311: Document audience and stability for remaining interfaces (gates) PIG-506: Does pig need a NATIVE keyword? (aniket486 via thejas) Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java Tue Aug 24 21:11:16 2010 @@ -32,9 +32,18 @@ import org.apache.pig.impl.logicalLayer. * Class-specific non-generic extensions of this class are needed for Pig to know what type * of return to expect from exec, and to find the appropriate classes through reflection. * All they have to do is implement the constructors that call into super(). Note that the - * no-parameter constructor is <b>required</b>, if nonsensical, for Pig to do its work. + * no-parameter constructor is <b>required</b>, if seemingly nonsensical, for Pig to do its work. + * <p> + * The Invoker family of udfs understand the following class names (all case-independent): + * <li>String + * <li>Long + * <li>Float + * <li>Double + * <li>Int + * <p> + * Invokers can also work with array arguments, represented in Pig as DataBags of single-tuple + * elements. Simply refer to <code>string[]</code>, for example. * <p> - * * This UDF allows one to dynamically invoke Java methods that return a <code>T</code> * <p> * Usage of the Invoker family of UDFs (adjust as appropriate): @@ -54,6 +63,7 @@ import org.apache.pig.impl.logicalLayer. * The first argument to the constructor is the full path to desired method.<br> * The second argument is a list of classes of the method parameters.<br> * If the method is not static, the first element in this list is the object to invoke the method on.<br> + * The second argument is optional (a no-argument static method is assumed if it is not supplied).<br> * The third argument is the keyword "static" (or "true") to signify that the method is static. <br> * The third argument is optional, and true by default.<br> * <p> @@ -65,6 +75,11 @@ public abstract class GenericInvoker<T> public GenericInvoker() {} + public GenericInvoker(String fullName) + throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { + invoker_ = new Invoker<T>(fullName, ""); + } + public GenericInvoker(String fullName, String paramSpecsStr) throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { invoker_ = new Invoker<T>(fullName, paramSpecsStr); Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java Tue Aug 24 21:11:16 2010 @@ -26,6 +26,10 @@ public class InvokeForDouble extends Gen public InvokeForDouble() {} + public InvokeForDouble(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { + super(fullName); + } + public InvokeForDouble(String fullName, String paramSpecsStr) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { super(fullName, paramSpecsStr); } @@ -34,4 +38,6 @@ public class InvokeForDouble extends Gen throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { super(fullName, paramSpecsStr, isStatic); } + + } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java Tue Aug 24 21:11:16 2010 @@ -27,6 +27,10 @@ public class InvokeForFloat extends Gene public InvokeForFloat() {} + public InvokeForFloat(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { + super(fullName); + } + public InvokeForFloat(String fullName, String paramSpecsStr) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { super(fullName, paramSpecsStr); } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java Tue Aug 24 21:11:16 2010 @@ -30,6 +30,10 @@ public class InvokeForInt extends Generi super(fullName, paramSpecsStr); } + public InvokeForInt(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { + super(fullName); + } + public InvokeForInt(String fullName, String paramSpecsStr, String isStatic) throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { super(fullName, paramSpecsStr, isStatic); Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java Tue Aug 24 21:11:16 2010 @@ -26,6 +26,10 @@ public class InvokeForLong extends Gener public InvokeForLong() {} + public InvokeForLong(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { + super(fullName); + } + public InvokeForLong(String fullName, String paramSpecsStr) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { super(fullName, paramSpecsStr); } Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java Tue Aug 24 21:11:16 2010 @@ -31,6 +31,10 @@ public class InvokeForString extends Gen super(fullName, paramSpecsStr); } + public InvokeForString(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException { + super(fullName); + } + public InvokeForString(String fullName, String paramSpecsStr, String isStatic) throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { super(fullName, paramSpecsStr, isStatic); Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java Tue Aug 24 21:11:16 2010 @@ -23,13 +23,34 @@ import java.lang.reflect.InvocationTarge import java.lang.reflect.Method; import java.lang.reflect.Type; import java.util.Arrays; +import java.util.List; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + public class Invoker<T> { + private static final Log LOG = LogFactory.getLog(Invoker.class); + + private static final Class<?> DOUBLE_ARRAY_CLASS = new double[0].getClass(); + private static final Class<?> INT_ARRAY_CLASS = new int[0].getClass(); + private static final Class<?> FLOAT_ARRAY_CLASS = new float[0].getClass(); + private static final Class<?> STRING_ARRAY_CLASS = new String[0].getClass(); + private static final Class<?> LONG_ARRAY_CLASS = new long[0].getClass(); + + @SuppressWarnings("unchecked") + private static final Set<Class<?>> ARRAY_CLASSES = Sets.newHashSet( + DOUBLE_ARRAY_CLASS, INT_ARRAY_CLASS, FLOAT_ARRAY_CLASS, STRING_ARRAY_CLASS, + LONG_ARRAY_CLASS); + private Method method_; private Class<?>[] paramClasses_; @@ -37,15 +58,17 @@ public class Invoker<T> { private Class<?> selfClass_; private Type returnType_; - public Invoker(String fullName, String paramSpecsStr) throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { + public Invoker(String fullName, String paramSpecsStr) + throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { this(fullName, paramSpecsStr, "true"); } - public Invoker(String fullName, String paramSpecsStr, String isStatic) throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { + public Invoker(String fullName, String paramSpecsStr, String isStatic) + throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException { String className = fullName.substring(0, fullName.lastIndexOf('.')); String methodName = fullName.substring(fullName.lastIndexOf('.')+1); Class<?> klazz = Class.forName(className); - String[] paramSpecs = paramSpecsStr.split(" "); + String[] paramSpecs = "".equals(paramSpecsStr) ? new String[0] : paramSpecsStr.split(" "); isStatic_ = "static".equalsIgnoreCase(isStatic) || "true".equals(isStatic); paramClasses_ = new Class<?>[paramSpecs.length]; for (int i = 0; i < paramSpecs.length; i++) { @@ -58,8 +81,9 @@ public class Invoker<T> { returnType_ = method_.getGenericReturnType(); } + @SuppressWarnings("rawtypes") public Type getReturnType() { - return returnType_; + return unPrimitivize((Class) returnType_); } private static Class<?>[] dropFirstClass(Class<?>[] original) { @@ -89,8 +113,18 @@ public class Invoker<T> { return Float.TYPE; } else if ("long".equalsIgnoreCase(klass)) { return Long.TYPE; + } else if ("double[]".equalsIgnoreCase(klass)) { + return DOUBLE_ARRAY_CLASS; + } else if ("int[]".equalsIgnoreCase(klass)) { + return INT_ARRAY_CLASS; + } else if ("long[]".equalsIgnoreCase(klass)) { + return LONG_ARRAY_CLASS; + } else if ("float[]".equalsIgnoreCase(klass)) { + return FLOAT_ARRAY_CLASS; + } else if ("string[]".equalsIgnoreCase(klass)) { + return STRING_ARRAY_CLASS; } else { - throw new FrontendException("unable to find mathing class for " + klass); + throw new FrontendException("unable to find matching class for " + klass); } } @@ -105,20 +139,80 @@ public class Invoker<T> { } else if (klass.equals(Double.TYPE)) { return Double.class; } else { - return klass; + return klass; + } + } + + private static <T> T convertToExpectedArg(Class<T> klass, Object obj) throws ExecException { + if (ARRAY_CLASSES.contains(klass)) { + DataBag dbag = (DataBag) obj; + if (STRING_ARRAY_CLASS.equals(klass)) { + List<String> dataList = Lists.newArrayList(); + for (Tuple t : dbag) { + dataList.add( (String) t.get(0)); + } + String[] dataArray = new String[dataList.size()]; + for (int i = 0; i < dataList.size(); i++) { + dataArray[i] = dataList.get(i); + } + obj = dataArray; + } else { + List<Number> dataList = bagToNumberList(dbag); + if (DOUBLE_ARRAY_CLASS.equals(klass)) { + double[] dataArray = new double[dataList.size()]; + for (int i = 0; i < dataList.size(); i++) { + dataArray[i] = dataList.get(i).doubleValue(); + } + obj = dataArray; + } else if (INT_ARRAY_CLASS.equals(klass)) { + int[] dataArray = new int[dataList.size()]; + for (int i = 0; i < dataList.size(); i++) { + dataArray[i] = dataList.get(i).intValue(); + } + obj = dataArray; + } else if (FLOAT_ARRAY_CLASS.equals(klass)) { + float[] dataArray = new float[dataList.size()]; + for (int i = 0; i < dataList.size(); i++) { + dataArray[i] = dataList.get(i).floatValue(); + } + obj = dataArray; + } else if (LONG_ARRAY_CLASS.equals(klass)) { + long[] dataArray = new long[dataList.size()]; + for (int i = 0; i < dataList.size(); i++) { + dataArray[i] = dataList.get(i).longValue(); + } + obj = dataArray; + } } + } + try { + return klass.cast(obj); + } catch (ClassCastException e) { + LOG.error("Error in dynamic argument processing. Casting to: " + + klass + " from: " + obj.getClass(), e); + throw new ExecException(e); + } + } + + private static List<Number> bagToNumberList(DataBag dbag) throws ExecException { + List<Number> dataList = Lists.newArrayList(); + for (Tuple t : dbag) { + dataList.add( (Number) t.get(0)); + } + return dataList; } private Object[] tupleToArgs(Tuple t) throws ExecException { - if ( (t == null && paramClasses_ != null) || (t != null && t.size() != paramClasses_.length)) { + if ( (t == null && (paramClasses_ != null || paramClasses_.length != 0)) + || (t != null && t.size() < paramClasses_.length)) { throw new ExecException("unable to match function arguments to declared signature."); } if (t == null) { return null; } - Object[] args = new Object[t.size()]; - for (int i = 0; i < t.size(); i++) { - args[i] = unPrimitivize(paramClasses_[i]).cast(t.get(i)); + Object[] args = new Object[paramClasses_.length]; + for (int i = 0; i < paramClasses_.length; i++) { + args[i] = convertToExpectedArg(unPrimitivize(paramClasses_[i]), t.get(i)); } return args; } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java?rev=988730&r1=988729&r2=988730&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java Tue Aug 24 21:11:16 2010 @@ -29,6 +29,8 @@ import org.apache.pig.builtin.InvokeForF import org.apache.pig.builtin.InvokeForInt; import org.apache.pig.builtin.InvokeForLong; import org.apache.pig.builtin.InvokeForString; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.junit.Test; @@ -40,6 +42,8 @@ import org.junit.Test; public class TestInvoker { private final TupleFactory tf_ = TupleFactory.getInstance(); + private final BagFactory bf_ = BagFactory.getInstance(); + @Test public void testStringInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, IOException { @@ -67,6 +71,11 @@ public class TestInvoker { } @Test + public void testNoArgInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, IOException { + InvokeForInt id = new InvokeForInt(TestInvoker.class.getName() + ".simpleStaticFunction"); + assertEquals(Integer.valueOf(1), id.exec(tf_.newTuple())); + } + @Test public void testLongInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, NumberFormatException, IOException { InvokeForLong il = new InvokeForLong("java.lang.Long.valueOf", "String"); Tuple t = tf_.newTuple(1); @@ -85,6 +94,21 @@ public class TestInvoker { } @Test + public void testArrayConversion() throws SecurityException, ClassNotFoundException, NoSuchMethodException, IOException { + InvokeForInt ii = new InvokeForInt(TestInvoker.class.getName() + ".avg", "double[]"); + DataBag nums = newSimpleBag(1.0, 2.0, 3.0); + assertEquals(Integer.valueOf(2), ii.exec(tf_.newTuple(nums))); + + ii = new InvokeForInt(TestInvoker.class.getName() + ".avg", "long[]"); + nums = newSimpleBag(1L, 2L, 3L); + assertEquals(Integer.valueOf(2), ii.exec(tf_.newTuple(nums))); + + InvokeForString is = new InvokeForString(TestInvoker.class.getName() + ".concatStringArray", "string[]"); + DataBag strings = newSimpleBag("foo", "bar", "baz"); + assertEquals("foobarbaz", is.exec(tf_.newTuple(strings))); + } + + @Test public void testDoubleInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, NumberFormatException, IOException { InvokeForDouble il = new InvokeForDouble("java.lang.Double.valueOf", "String"); Tuple t = tf_.newTuple(1); @@ -106,6 +130,42 @@ public class TestInvoker { return str1.concat(str2); } + public static String concatStringArray(String[] strings) { + StringBuilder sb = new StringBuilder(); + for (String s : strings) { + sb.append(s); + } + return sb.toString(); + } + + public static int simpleStaticFunction() { + return 1; + } + + public static int avg(long[] nums) { + long sum = 0; + for (long d: nums) { + sum += d; + } + return (int) sum/nums.length; + } + + public static int avg(double[] nums) { + double sum = 0; + for (double d: nums) { + sum += d; + } + return (int) sum/nums.length; + } + + private DataBag newSimpleBag(Object... objects) { + DataBag bag = bf_.newDefaultBag(); + for (Object o : objects) { + bag.add(tf_.newTuple(o)); + } + return bag; + } + @Test public void testSpeed() throws IOException, SecurityException, ClassNotFoundException, NoSuchMethodException { EvalFunc<Double> log = new Log();