Author: dvryaboy
Date: Tue Jun 22 07:04:41 2010
New Revision: 956794

URL: http://svn.apache.org/viewvc?rev=956794&view=rev
Log:
PIG-1427: Monitor and kill runaway UDFs

Added:
    hadoop/pig/trunk/lib/guava-r03.jar   (with props)
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/build.xml
    hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 22 07:04:41 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1427: Monitor and kill runaway UDFs (dvryaboy)
+
 PIG-1428: Make a StatusReporter singleton available for incrementing counters 
(dvryaboy)
 
 PIG-972: Make describe work with nested foreach (aniket486 via daijy)

Modified: hadoop/pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Tue Jun 22 07:04:41 2010
@@ -51,6 +51,7 @@
     <property name="hbase.jarfile" value="hbase-0.20.0.jar" />
     <property name="hbase.test.jarfile" value="hbase-0.20.0-test.jar" />
        <property name="zookeeper.jarfile" value="zookeeper-hbase-1329.jar" />
+       <property name="guava.jarfile" value="guava-r03.jar" />
        
     <!-- javac properties -->
     <property name="javac.debug" value="on" />
@@ -167,6 +168,7 @@
        <path refid="compile.classpath"/>       
         <fileset file="${lib.dir}/${hadoop.jarfile}" />
         <fileset file="${lib.dir}/${hbase.jarfile}" />
+               <fileset file="${lib.dir}/${guava.jarfile}" />
         <fileset file="${lib.dir}/${hbase.test.jarfile}" />
        <fileset file="${lib.dir}/${zookeeper.jarfile}"/>
        <fileset 
file="${ivy.lib.dir}/jackson-mapper-asl-${jackson.version}.jar"/>
@@ -178,6 +180,7 @@
     <!-- javadoc-classpath -->
     <path id="javadoc-classpath">
         <fileset file="${lib.dir}/${hbase.jarfile}" />
+               <fileset file="${lib.dir}/${guava.jarfile}" />
         <fileset file="${lib.dir}/${hbase.test.jarfile}" />
        <path refid="javadoc.classpath"/>       
     </path> 

Added: hadoop/pig/trunk/lib/guava-r03.jar
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/guava-r03.jar?rev=956794&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/pig/trunk/lib/guava-r03.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ hadoop/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml Tue Jun 
22 07:04:41 2010
@@ -1229,7 +1229,6 @@ public class IntMax extends EvalFunc&lt;
 <p>One problem that users run into is when they make assumption about how many 
times a constructor for their UDF is called. For instance, they might be 
creating side files in the store function and doing it in the constructor seems 
like a good idea. The problem with this approach is that in most cases Pig 
instantiates functions on the client side to, for instance, examine the schema 
of the data.  </p>
 <p>Users should not make assumptions about how many times a function is 
instantiated; instead, they should make their code resilient to multiple 
instantiations. For instance, they could check if the files exist before 
creating them. </p>
 
-
 </section>
 
 <section>
@@ -1250,6 +1249,45 @@ public class IntMax extends EvalFunc&lt;
 <p>To store information, the UDF calls getUDFProperties. This returns a 
Properties object which the UDF can record the information in or read the 
information from. To avoid name space conflicts UDFs are required to provide a 
signature when obtaining a Properties object. This can be done in two ways. The 
UDF can provide its Class object (via this.getClass()). In this case, every 
instantiation of the UDF will be given the same Properties object. The UDF can 
also provide its Class plus an array of Strings. The UDF can pass its 
constructor arguments, or some other identifying strings. This allows each 
instantiation of the UDF to have a different properties object thus avoiding 
name space collisions between instantiations of the UDF.</p>
 </section>
 
+<section>
+<title>Monitoring long-running UDFs</title>
+<p>Sometimes one may discover that a UDF that executes very quickly in the 
vast majority of cases turns out to run exceedingly slowly on occasion. This 
can happen, for example, if a UDF uses complex regular expressions to parse 
free-form strings, or if a UDF uses some external service to communicate with. 
As of version 0.8, Pig provides a facility for monitoring the length of time a 
UDF is executing for every invocation, and terminating its execution if it runs 
too long. This facility can be turned on using a simple Java annotation:</p
+       
+<source>
+       import org.apache.pig.builtin.MonitoredUDF;
+       
+       @MonitoredUDF
+       public class MyUDF extends EvalFunc&lt;Integer&gt; {
+         /* implementation goes here */
+       }
+</source>
+
+<p>Simply annotating your UDF in this way will cause Pig to terminate the 
UDF's exec() method if it runs for more than 10 seconds, and return the default 
value of null. The duration of the timeout and the default value can be 
specified in the annotation, if desired:</p>
+
+<source>
+       import org.apache.pig.builtin.MonitoredUDF;
+       
+       @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 100, 
intDefault = 10)
+       public class MyUDF extends EvalFunc&lt;Integer&gt; {
+         /* implementation goes here */
+       }
+</source>
+
+<p>intDefault, longDefault, doubleDefault, floatDefault, and stringDefault can 
be specified in the annotation; the correct default will be chosen based on the 
return type of the UDF. Custom defaults for tuples and bags are not supported 
at this time.</p>
+
+<p>If desired, custom logic can also be implemented for error handling by 
creating a subclass of MonitoredUDFExecutor.ErrorCallback, and overriding its 
handleError and/or handleTimeout methods. Both of those methods are static, and 
are passed in the instance of the EvalFunc that produced an exception, as well 
as an exception, so you may use any state you have in the UDF to process the 
errors as desired. The default behavior is to increment Hadoop counters every 
time an error is encountered. Once you have an implementation of the 
ErrorCallback that performs your custom logic, you can provide it in the 
annotation:</p>
+
+<source>
+       import org.apache.pig.builtin.MonitoredUDF;
+
+       @MonitoredUDF(errorCallback=MySpecialErrorCallback.class)
+       public class MyUDF extends EvalFunc&lt;Integer&gt; {
+         /* implementation goes here */
+       }
+</source>
+
+<p>Currently the MonitoredUDF annotation works with regular and Algebraic 
UDFs, but has no effect on UDFs that run in the Accumulator mode.</p>
+
 </section>
 
 </body>

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=956794&r1=956793&r2=956794&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Tue Jun 22 07:04:41 2010
@@ -36,6 +36,8 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import org.apache.pig.builtin.MonitoredUDF;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -62,6 +64,9 @@ public class POUserFunc extends Expressi
     public static final byte INTERMEDIATE = 1;
     public static final byte FINAL = 2;
     private boolean initialized = false;
+    private MonitoredUDFExecutor executor = null;
+    
+    
 
     public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
         super(k, rp);
@@ -93,6 +98,9 @@ public class POUserFunc extends Expressi
 
     private void instantiateFunc(FuncSpec fSpec) {
         this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
+        if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
+            executor = new MonitoredUDFExecutor(func);
+        }
         //the next couple of initializations do not work as intended for the 
following reasons
         //the reporter and pigLogger are member variables of PhysicalOperator
         //when instanitateFunc is invoked at deserialization time, both
@@ -205,8 +213,12 @@ public class POUserFunc extends Expressi
                         result.result = ((Accumulator)func).getValue();        
                         ((Accumulator)func).cleanup();
                     }
-                } else {                                       
+                } else {
+                    if (executor != null) {
+                        result.result = executor.monitorExec((Tuple) 
result.result);
+                    } else {
                     result.result = func.exec((Tuple) result.result);
+                    }
                 }
                 if(resultType == DataType.BYTEARRAY) {
                     if(res.result != null && DataType.findType(result.result) 
!= DataType.BYTEARRAY) {
@@ -369,6 +381,9 @@ public class POUserFunc extends Expressi
 
     public void finish() {
         func.finish();
+        if (executor != null) {
+            executor.terminate();
+        }
     }
 
     public Schema outputSchema(Schema input) {

Added: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java?rev=956794&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
 Tue Jun 22 07:04:41 2010
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * MonitoredUDF is used to watch execution of a UDF, and kill it if the UDF 
takes an
+ * exceedingly long time. Null is returned if the UDF times out.
+ * 
+ * Optionally, UDFs can implement the provided interfaces to provide custom 
logic for 
+ * handling errors and default values.
+ * 
+ */
+public class MonitoredUDFExecutor {
+
+    private final ExecutorService exec;
+    private final TimeUnit timeUnit;
+    private final long duration;
+    private final Object defaultValue;
+    @SuppressWarnings("unchecked")
+    private final EvalFunc evalFunc;
+    private final Function<Tuple, Object> closure;
+
+    // Let us reflect upon our errors.
+    private final Class<? extends ErrorCallback> errorCallback;
+    private final Method errorHandler;
+    private final Method timeoutHandler;
+
+    @SuppressWarnings("unchecked")
+    public MonitoredUDFExecutor(EvalFunc udf) {
+        // is 10 enough? This is pretty arbitrary.
+        exec = MoreExecutors.getExitingExecutorService(new 
ScheduledThreadPoolExecutor(10));
+        this.evalFunc = udf;
+        MonitoredUDF anno = udf.getClass().getAnnotation(MonitoredUDF.class);
+        timeUnit = anno.timeUnit();
+        duration = anno.duration();
+        errorCallback = anno.errorCallback();
+
+        // The exceptions really should not happen since our handlers are 
defined by the parent class which 
+        // must be extended by all custom handlers.
+        try {
+            errorHandler = errorCallback.getMethod("handleError", 
EvalFunc.class, Exception.class);
+            timeoutHandler = errorCallback.getMethod("handleTimeout", 
EvalFunc.class, Exception.class);
+        } catch (SecurityException e1) {
+            throw new RuntimeException("Unable to use the monitored callback 
due to a Security Exception while working with "
+                    + evalFunc.getClass().getName());
+        } catch (NoSuchMethodException e1) {
+            throw new RuntimeException("Unable to use the monitored callback 
because a required method not found while working with "
+                    + evalFunc.getClass().getName());
+        }
+
+        Type retType = udf.getReturnType();
+        defaultValue = getDefaultValue(anno, retType);
+        closure = new Function<Tuple, Object>() {
+            public Object apply(Tuple input) {
+                try {
+                    return evalFunc.exec(input);
+                } catch (IOException e) {
+                    // I don't see a CheckedFunction in Guava. Resorting to 
this hackery.
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+    }
+
+    private Object getDefaultValue(MonitoredUDF anno, Type retType) {
+        if (retType.equals(Integer.TYPE) || retType.equals(Integer.class)) {
+            return (anno.intDefault().length == 0) ? null : 
anno.intDefault()[0];
+        } else if (retType.equals(Double.TYPE) || 
retType.equals(Double.class)) {
+            return (anno.doubleDefault().length == 0) ? null : 
anno.doubleDefault()[0];
+        } else if (retType.equals(Float.TYPE) || retType.equals(Float.class)) {
+            return (anno.floatDefault().length == 0) ? null : 
anno.floatDefault()[0];
+        } else if (retType.equals(Long.TYPE) || retType.equals(Long.class)) {
+            return (anno.longDefault().length == 0) ? null : 
anno.longDefault()[0];
+        } else if (retType.equals(String.class)) {
+            return (anno.stringDefault().length == 0) ? null : 
anno.stringDefault()[0];
+        } else {
+            // Default default is null.
+            return null;
+        }
+    }
+
+    /**
+     * This method *MUST* be called in the finish by POUserFunc.
+     * Though we do use an ExitingExecutorService just in case.
+     */
+    public void terminate() {
+        exec.shutdownNow();
+    }
+
+    /**
+     * UDF authors can optionally extend this class and provide the class of 
their custom callbacks in the annotation
+     * to perform their own handling of errors and timeouts.
+     */
+
+    public static class ErrorCallback {
+
+        @SuppressWarnings("unchecked")
+        public static void handleError(EvalFunc evalFunc, Exception e) {
+            evalFunc.getLogger().error(e);
+            StatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null && 
+                    reporter.getCounter(evalFunc.getClass().getName(), 
e.toString()) != null) {
+                reporter.getCounter(evalFunc.getClass().getName(), 
e.toString()).increment(1L);
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        public static void handleTimeout(EvalFunc evalFunc, Exception e) {
+            evalFunc.getLogger().error(e);
+            StatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null && 
+                    reporter.getCounter(evalFunc.getClass().getName(), 
"MonitoredUDF Timeout") != null) {
+                reporter.getCounter(evalFunc.getClass().getName(), 
"MonitoredUDF Timeout").increment(1L);
+            }
+        }
+    }
+
+    public Object monitorExec(final Tuple input) throws IOException {
+        CheckedFuture<Object, Exception> f = 
+            Futures.makeChecked(
+                    // the Future whose exceptions we want to catch
+                    exec.submit(new Callable<Object>() {
+                        @Override
+                        public Object call() throws Exception {
+                            return closure.apply(input);
+                        }
+                    }), 
+                    // How to map those exceptions; we simply rethrow them.
+                    // Theoretically we could do some handling of 
+                    // CancellationException, ExecutionException  and 
InterruptedException here
+                    // and do something special for UDF IOExceptions as 
opposed to thread exceptions.
+                    new Function<Exception, Exception>() { 
+                        public Exception apply(Exception e) { 
+                            return e; 
+                        } 
+                    });
+
+        Object result = defaultValue;
+        
+        // The outer try "should never happen" (tm).
+        try {
+            try {
+                result = f.get(duration, timeUnit);
+            } catch (TimeoutException e) {
+                timeoutHandler.invoke(null, evalFunc, e);
+            } catch (Exception e) {
+                errorHandler.invoke(null, evalFunc, e);
+            } finally {
+                f.cancel(true);
+            }
+        } catch (IllegalArgumentException e) {
+            throw new IOException(e);
+        } catch (IllegalAccessException e) {
+            throw new IOException(e);
+        } catch (InvocationTargetException e) {
+            throw new IOException(e);
+        }
+        return result;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java?rev=956794&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MonitoredUDF.java Tue Jun 22 
07:04:41 2010
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor.ErrorCallback;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+
+/**
+ * Describes how the execution of a UDF should be monitored, and what 
+ * to do if it times out.
+ * <p>
+ * NOTE: does not work with UDFs that implement the Accumulator interface
+ * <p>
+ *     
+ * Setting a default value will cause it to be used instead of null when the 
UDF times out.
+ * The appropriate value among in, long, string, etc, is used.
+ * The default fields of these annotations are arrays for Java reasons I won't 
bore you with.
+ * <p>
+ * Set them as if they were primitives: <code>@MonitoredUDF( intDefault=5 
)</code>
+ * <p>
+ * There is currently no way to provide a default ByteArray, Tuple, Map, or 
Bag. Null will always be used for those.
+ * <p>
+ * Currently this annotation is ignored when the Accumulator interface is used.
+ */
+...@interfaceaudience.public
+...@interfacestability.unstable
+...@documented
+...@retention(value=RetentionPolicy.RUNTIME)
+public @interface MonitoredUDF {
+    /**
+     * Time Units in which to measure timeout value.
+     * @return Time Units in which to measure timeout value.
+     */
+    TimeUnit timeUnit() default TimeUnit.SECONDS;
+    
+    /**
+     * Number of time units after which the execution should be halted and 
default returned.
+     * @return Number of time units after which the execution should be halted 
and default returned.
+     */
+    int duration() default 10;
+    
+    int[] intDefault() default {};
+    long[] longDefault() default {};
+    double[] doubleDefault() default {};
+    float[] floatDefault() default {};
+    String[] stringDefault() default {};
+  
+    /**
+     * UDF author can implement a static extension of 
MonitoredUDFExecutor.ErrorCallback and provide its class
+     * to the annotation in order to perform custom error handling.
+     * @return
+     */
+    Class<? extends ErrorCallback> errorCallback() default 
MonitoredUDFExecutor.ErrorCallback.class;
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java?rev=956794&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMonitoredUDF.java Tue Jun 22 
07:04:41 2010
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pig.EvalFunc;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor.ErrorCallback;
+import org.apache.pig.builtin.MonitoredUDF;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+public class TestMonitoredUDF {
+
+    @Test
+    public void testTimeout() throws IOException {
+
+        SimpleUDF udf = new SimpleUDF(1000);
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        assertNull(exec.monitorExec(null));
+    }
+
+    @Test
+    public void testTimeoutWithDefault() throws IOException {
+
+        SimpleIntUDF udf = new SimpleIntUDF();
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        assertEquals( SimpleIntUDF.DEFAULT, ((Integer) 
exec.monitorExec(null)).intValue());
+    }
+
+    @Test
+    public void testCustomErrorHandler() throws IOException {
+
+        ErrorCallbackUDF udf = new ErrorCallbackUDF();
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        exec.monitorExec(null);
+        assertTrue(thereWasATimeout);
+    }
+
+    @Test
+    public void testNoTimeout() throws IOException {
+        SimpleUDF udf = new SimpleUDF(100);
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(udf);
+        assertTrue((Boolean) exec.monitorExec(null));
+    }
+
+    public static void main(String[] args) throws IOException {
+        long startTime = System.currentTimeMillis();
+        long unmonitoredTime = 0, monitoredTime = 0;
+        
+        int[] numReps = { 1000, 10000, 100000, 1000000};
+        MonitoredNoOpUDF monitoredUdf  = new MonitoredNoOpUDF();
+        MonitoredUDFExecutor exec = new MonitoredUDFExecutor(monitoredUdf);
+        UnmonitoredNoOpUDF unmonitoredUdf = new UnmonitoredNoOpUDF();
+        // warm up
+        System.out.println("Warming up.");
+        for (int i : numReps) {
+            for (int j=0; j < i; j++) {
+                exec.monitorExec(null);
+                unmonitoredUdf.exec(null);
+            }
+        }
+        System.out.println("Warmed up. Timing.");
+        // tests!
+        for (int k = 0; k < 5; k++) {
+            for (int i : numReps) {
+                startTime = System.currentTimeMillis();
+                for (int j = 0; j < i; j++) {
+                    exec.monitorExec(null);
+                }
+                monitoredTime = System.currentTimeMillis() - startTime;
+                startTime = System.currentTimeMillis();
+                for (int j = 0; j < i; j++) {
+                    unmonitoredUdf.exec(null);
+                }
+                unmonitoredTime = System.currentTimeMillis() - startTime;
+                System.out.println("Reps: " + i + " monitored: " + 
monitoredTime + " unmonitored: " + unmonitoredTime);
+            }    
+        }
+    }
+
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 500)
+    public class SimpleUDF extends EvalFunc<Boolean> {
+        int wait;
+        public SimpleUDF(int wait) {
+            this.wait = wait;
+        }
+
+        @Override
+        public Boolean exec(Tuple input) throws IOException {
+            try {
+                Thread.sleep(wait);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return true;
+        }
+    }
+    
+    public static class UnmonitoredNoOpUDF extends EvalFunc<Boolean> {
+        @Override public Boolean exec(Tuple input) throws IOException { 
+            System.currentTimeMillis(); return true; }
+    }
+
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 500)
+    public static class MonitoredNoOpUDF extends EvalFunc<Boolean> {
+        @Override public Boolean exec(Tuple input) throws IOException { 
+            System.currentTimeMillis(); return true; }
+    }
+
+    
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 100, intDefault 
= SimpleIntUDF.DEFAULT)
+    public class SimpleIntUDF extends EvalFunc<Integer> {
+        public static final int DEFAULT = 123;
+        public static final int NOT_DEFAULT = 321;
+
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return  NOT_DEFAULT;
+        }
+    }
+
+    @MonitoredUDF(timeUnit = TimeUnit.MILLISECONDS, duration = 100, 
errorCallback = CustomErrorCallback.class)
+    public class ErrorCallbackUDF extends EvalFunc<Integer> {
+
+        @Override
+        public Integer exec(Tuple input) throws IOException {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+            return null;
+        }
+
+    }
+
+    static boolean thereWasATimeout = false;
+
+    public static class CustomErrorCallback extends ErrorCallback {
+
+        @SuppressWarnings("unchecked")
+        public static void handleTimeout(EvalFunc evalFunc, Exception e) {
+            thereWasATimeout = true;
+        }
+    }
+}


Reply via email to