Author: olga
Date: Mon Dec  8 11:54:25 2008
New Revision: 724462

URL: http://svn.apache.org/viewvc?rev=724462&view=rev
Log:
PIG-546: FilterFunc calls empty constructor when it should be calling 
parameterized constructor

Added:
    hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java
Modified:
    hadoop/pig/branches/types/CHANGES.txt
    
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
    
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Mon Dec  8 11:54:25 2008
@@ -325,3 +325,6 @@
     PIG-538: support for null constants (pradeepk via olgan)
 
     PIG-385: more null handling (pradeepl via olgan)
+
+    PIG-546: FilterFunc calls empty constructor when it should be calling
+    parameterized constructor

Modified: 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 (original)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 Mon Dec  8 11:54:25 2008
@@ -54,6 +54,9 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.EvalFunc;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -615,6 +618,33 @@
         }
 }
 
+class FunctionType {
+    public static final byte UNKNOWNFUNC = 0;
+    public static final byte EVALFUNC = 2;
+    public static final byte COMPARISONFUNC = 4;
+    public static final byte LOADFUNC = 8; 
+    public static final byte STOREFUNC = 16;
+
+    public static void tryCasting(Object func, byte funcType) throws Exception 
{
+        switch(funcType) {
+        case FunctionType.EVALFUNC:
+                       EvalFunc evalFunc = (EvalFunc) func;
+            break;
+        case FunctionType.COMPARISONFUNC:
+                       ComparisonFunc comparisonFunc = (ComparisonFunc) func;
+            break;
+        case FunctionType.LOADFUNC:
+                       LoadFunc loadFunc = (LoadFunc) func;
+            break;
+        case FunctionType.STOREFUNC:
+                       StoreFunc storeFunc = (StoreFunc) func;
+            break;
+        default:
+            throw new Exception("Received an unknown function type: " + 
funcType);
+        }
+    }
+}
+
 PARSER_END(QueryParser)
 
 // Skip all the new lines, tabs and spaces
@@ -972,12 +1002,7 @@
 {
        (       filename = FileName()
                (
-               <USING>  funcName = QualifiedFunction() "(" funcArgs = 
StringList() ")"
-               {
-                       funcSpecAsString = funcName + "(" + funcArgs + ")";
-                       funcSpec = new FuncSpec(funcSpecAsString);
-                       log.debug("LoadClause: funcSpec = " + funcSpec);
-               }
+        <USING>  funcSpec = NonEvalFuncSpec(FunctionType.LOADFUNC)
                )?
                (
                <SPLIT> <BY> t3 = <QUOTEDSTRING>
@@ -990,7 +1015,7 @@
                )?
        )
        {
-               if (funcSpecAsString == null){
+               if (funcSpec == null){
                        funcSpecAsString = PigStorage.class.getName();
                        funcSpec = new FuncSpec(funcSpecAsString);
                        log.debug("LoadClause: funcSpec = " + funcSpec);
@@ -1272,23 +1297,7 @@
                                log.debug("PUnaryCond: Connected operator " + 
cond.getClass().getName() + " " + cond + " to " + lhs + " logical plan " + lp);
                        }
                )
-|      LOOKAHEAD(EvalFunction() "(") 
-               (evalFunc=EvalFunction() "(" args=EvalArgs(over,specs,lp,input) 
")" 
-                       {
-                FuncSpec funcSpec = new 
FuncSpec(evalFunc.getClass().getName());
-                Type javaType = evalFunc.getReturnType();
-                byte type = DataType.findType(javaType);
-
-                log.debug("Return type of UDF: " + 
DataType.findTypeName(type));
-                               cond = new LOUserFunc(lp, new 
OperatorKey(scope, getNextId()), funcSpec, args, type);
-                               lp.add(cond);
-                               log.debug("PUnaryCond: Added operator " + 
cond.getClass().getName() + " " + cond + " to logical plan " + lp);
-                               for(ExpressionOperator exprOp: args) {
-                                       lp.connect(exprOp, cond);
-                                       log.debug("PUnaryCond: Added operator " 
+ exprOp.getClass().getName() + " " + cond + " to logical plan " + lp);
-                               }
-                       }
-               )
+|      LOOKAHEAD(EvalFuncSpec(over, specs, lp, input)) cond = 
EvalFuncSpec(over,specs,lp, input, FunctionType.EVALFUNC)
 |      cond = PNullCond(over,specs,lp,input)
 |      cond = PNotCond(over,specs,lp,input)
 
@@ -1433,6 +1442,7 @@
        boolean asc = true; 
        String funcName = null; 
        Token t1;
+    FuncSpec funcSpec = null;
        log.trace("Entering OrderClause");
 }
 {
@@ -1461,13 +1471,13 @@
                }       
        )
        (
-        <USING>  funcName = QualifiedFunction()
+           <USING>  funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
     )?
 
        )
        {
                LOSort sort = new LOSort(lp, new OperatorKey(scope, 
getNextId()), sortColPlans, ascOrder, 
-                                         (funcName != null ? new 
FuncSpec(funcName) : null));
+                                         funcSpec );
                sort.setStar(star);
                sort.setLimit(-1);
                lp.add(sort);
@@ -1860,17 +1870,16 @@
     String stream, deserializer;
     StreamingCommand.HandleSpec[] handleSpecs;
     String functionName = "PigStorage", functionArgs="";
+    byte funcType = (handle.compareTo(StreamingCommand.Handle.INPUT) != 0 ? 
FunctionType.LOADFUNC : FunctionType.STOREFUNC) ;
+    FuncSpec funcSpec = null;
 } 
 {
     stream = CommandStream() 
     [
-        <USING> functionName = QualifiedFunction() 
-        [
-            "(" functionArgs = StringList() ")"
-        ]
+           <USING>  funcSpec = NonEvalFuncSpec(funcType)
     ]
     {
-        deserializer = functionName + "(" + functionArgs + ")";
+        deserializer = (funcSpec == null? functionName + "(" + ")" : 
funcSpec.toString());
         command.addHandleSpec(handle, 
                               new HandleSpec(stream, deserializer)
                              );
@@ -1879,13 +1888,10 @@
         "," 
         stream = CommandStream() 
         [
-            <USING> functionName = QualifiedFunction() 
-            [
-                "(" functionArgs = StringList() ")"
-            ]
+               <USING>  funcSpec = NonEvalFuncSpec(funcType)
         ] 
         {
-            deserializer = functionName + "(" + functionArgs + ")";
+            deserializer = (funcSpec == null? functionName + "(" + ")" : 
funcSpec.toString());
             command.addHandleSpec(handle, 
                                   new HandleSpec(stream, deserializer)
                                  );
@@ -1921,22 +1927,20 @@
        }
 }
 LogicalOperator StoreClause(LogicalPlan lp) : {LogicalOperator lo; Token t; 
String fileName; String functionSpec = null; 
-                                                String functionName, 
functionArgs;}
+                                                String functionName, 
functionArgs; FuncSpec funcSpec = null;}
 {
     t = <IDENTIFIER> <INTO> fileName = FileName()
     (
-        <USING> functionName = QualifiedFunction() {functionSpec = 
functionName;}
-        (
-            "(" functionArgs = StringList() ")" {functionSpec = functionSpec + 
"(" + functionArgs + ")";}
-        )?
+        <USING>  funcSpec = NonEvalFuncSpec(FunctionType.STOREFUNC)
     )?
     {
 
-        if (functionSpec == null)
-            functionSpec = PigStorage.class.getName() + "()";
+        if (funcSpec == null) {
+            funcSpec = new FuncSpec(PigStorage.class.getName() + "()");
+        }
 
         LogicalOperator store = new LOStore(lp, new OperatorKey(scope, 
getNextId()),
-                                            new FileSpec(fileName, new 
FuncSpec(functionSpec)));
+                                            new FileSpec(fileName, funcSpec));
 
         LogicalOperator input = mapAliasOp.get(t.image);
         if (input == null)
@@ -1975,7 +1979,7 @@
        (
        t = <IDENTIFIER> "="
        (
-       LOOKAHEAD(FuncEvalSpec(over, specs, lp, input)) item = 
InfixExpr(over,specs,lp, input)
+       LOOKAHEAD(EvalFuncSpec(over, specs, lp, input, FunctionType.EVALFUNC)) 
item = InfixExpr(over,specs,lp, input)
     {
         lp.add(item);
     }
@@ -2126,6 +2130,7 @@
        LogicalOperator eOp;
        Token t; 
        boolean asc = true; 
+    FuncSpec funcSpec = null;
        log.trace("Entering NestedSortOrArrange");}
 {
        (
@@ -2157,13 +2162,13 @@
                        }               
        )     
     (
-        <USING>  funcName = QualifiedFunction()
+        <USING>  funcSpec = NonEvalFuncSpec(FunctionType.COMPARISONFUNC)
     )?
        )
        {       
                log.debug("Before creating LOSort");
                LOSort sort = new LOSort(lp, new OperatorKey(scope, 
getNextId()), sortColPlans, ascOrder, 
-                                         (funcName != null ? new 
FuncSpec(funcName) : null));
+                                         funcSpec);
                sort.setStar(star);
                log.debug("After creating LOSort");
                try {
@@ -2479,8 +2484,8 @@
     LOOKAHEAD(Const(lp)) item = Const(lp)
 |      (
        (
-               LOOKAHEAD(FuncEvalSpec(over,specs,lp,input))
-               item = FuncEvalSpec(over,specs,lp,input)
+               LOOKAHEAD(EvalFuncSpec(over,specs,lp,input, 
FunctionType.EVALFUNC))
+               item = EvalFuncSpec(over,specs,lp,input, FunctionType.EVALFUNC)
        |       item = ColOrSpec(over,specs,lp,input) 
        |       item = BinCond(over,specs,lp,input)
        
@@ -2547,7 +2552,7 @@
 }
 
 
-ExpressionOperator FuncEvalSpec(Schema over, Map<String, LogicalOperator> 
specs, LogicalPlan lp, LogicalOperator input) : 
+ExpressionOperator EvalFuncSpec(Schema over, Map<String, LogicalOperator> 
specs, LogicalPlan lp, LogicalOperator input, byte funcType) : 
 {
        String funcName = null; 
        FuncSpec funcSpec = null; 
@@ -2556,55 +2561,117 @@
        List<ExpressionOperator> args;
        ExpressionOperator userFunc;
     LOUserFunc userAliasFunc = null;
-    EvalFunc evalFunc = null;
-       log.trace("Entering FuncEvalSpec");
+    Object func = null;
+       log.trace("Entering EvalFuncSpec");
 }
 {
        (
     (
     LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) 
funcNameAlias=QualifiedFunction()
     {
-        
+               func = pigContext.instantiateFuncFromAlias(funcNameAlias);
                try{
-                       evalFunc = (EvalFunc) 
pigContext.instantiateFuncFromAlias(funcNameAlias);
-            Type javaType = evalFunc.getReturnType();
-            log.debug("Type: " + javaType);
-            log.debug("funcName: " + funcName + " class name: " + 
evalFunc.getClass().getName() + " return type: " + DataType.findType(javaType));
-               }catch (Exception e){
+            FunctionType.tryCasting(func, funcType);
+               } catch (Exception e){
                        throw new ParseException(e.getMessage());
                }
     }
     )
-|   evalFunc=EvalFunction()
+|   func=EvalFunction(funcType)
     )
     "(" args=EvalArgs(over,specs,lp,input) ")" 
        {
-               if(null != evalFunc) {
-            funcName = evalFunc.getClass().getName();
+               if(null != func) {
+            funcName = func.getClass().getName();
             if(null != funcNameAlias) {
                 funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias);
             } else {
                 funcSpec = new FuncSpec(funcName);
             }
-            Type javaType = evalFunc.getReturnType();
-            byte type = DataType.findType(javaType);
-            log.debug("Return type of UDF: " + DataType.findTypeName(type));
-            log.debug("FuncEvalSpec: funcSpec: " + funcSpec);
+            byte type = DataType.BYTEARRAY;
+            switch(funcType) {
+            case FunctionType.EVALFUNC:
+                Type javaType = ((EvalFunc)func).getReturnType();
+                type = DataType.findType(javaType);
+                log.debug("Return type of UDF: " + 
DataType.findTypeName(type));
+                log.debug("EvalFuncSpec: funcSpec: " + funcSpec);
+                break;
+            default:
+                throw new ParseException("Received an unknown function type: " 
+ funcType);
+            }
                        userFunc = new LOUserFunc(lp, new OperatorKey(scope, 
getNextId()), funcSpec, args, type);
         } else {
             throw new ParseException("Could not instantiate function: " + 
funcNameAlias);
         }
                lp.add(userFunc);
-               log.debug("FuncEvalSpec: Added operator " + 
userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
+               log.debug("EvalFuncSpec: Added operator " + 
userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
                for(ExpressionOperator exprOp: args) {
                        lp.connect(exprOp, userFunc);
-                       log.debug("FuncEvalSpec: Connected operator " + 
exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan 
" + lp);
+                       log.debug("EvalFuncSpec: Connected operator " + 
exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan 
" + lp);
                }
-               log.trace("Exiting FuncEvalSpec");
+               log.trace("Exiting EvalFuncSpec");
                return userFunc;
        }
 }
 
+FuncSpec NonEvalFuncSpec(byte funcType) : 
+{
+       String functionName = null; 
+       FuncSpec funcSpec = null; 
+       String funcNameAlias = null; 
+    String functionArgs = null;
+    Object func = null;
+       log.trace("Entering NonEvalFuncSpec");
+}
+{
+       (
+    (
+    LOOKAHEAD({ null != pigContext.getFuncSpecFromAlias(getToken(1).image) }) 
funcNameAlias=QualifiedFunction()
+    {
+               func = pigContext.instantiateFuncFromAlias(funcNameAlias);
+               try{
+            FunctionType.tryCasting(func, funcType);
+               } catch (Exception e){
+                       throw new ParseException(e.getMessage());
+               }
+    }
+    )
+|   functionName = QualifiedFunction() ( "(" functionArgs = StringList() ")" )?
+    )
+       {
+               if(null != func) {
+            functionName = func.getClass().getName();
+            if(null != funcNameAlias) {
+                funcSpec = pigContext.getFuncSpecFromAlias(funcNameAlias);
+            } else {
+                funcSpec = new FuncSpec(functionName);
+            }
+        } else if (functionName != null) {
+            funcSpec = new FuncSpec(functionName + (functionArgs == null? "(" 
+ ")" : "(" + functionArgs + ")"));
+        } else {
+            throw new ParseException("Could not instantiate function: " + 
funcNameAlias);
+        }
+            switch(funcType) {
+            case FunctionType.COMPARISONFUNC:
+            case FunctionType.LOADFUNC:
+            case FunctionType.STOREFUNC:
+                //funcSpec = new FuncSpec(func.getClass().getName() + 
(functionArgs == null? "(" + ")" : "(" + functionArgs + ")"));
+                System.err.println("funcSpec: " + funcSpec);
+                func = pigContext.instantiateFuncFromSpec(funcSpec);
+                       try{
+                    FunctionType.tryCasting(func, funcType);
+                       } catch (Exception e){
+                               throw new ParseException(e.getMessage());
+                       }
+                break;
+            default:
+                throw new ParseException("Received an unknown function type: " 
+ funcType);
+            }
+               log.trace("Exiting NonEvalFuncSpec");
+               return funcSpec;
+       }
+}
+
 List<ExpressionOperator> EvalArgs(Schema over, Map<String, LogicalOperator> 
specs, LogicalPlan lp, LogicalOperator input) : 
 {
        ArrayList<ExpressionOperator> specList = new 
ArrayList<ExpressionOperator>(); 
@@ -2943,26 +3010,24 @@
 
 // These the simple non-terminals that are shared across many
 
-EvalFunc  EvalFunction() : 
+Object  EvalFunction(byte funcType) : 
 {
        String funcName;
-    EvalFunc ef;
+    Object func = null;
        log.trace("Entering EvalFunction");
 }
 {
        funcName = QualifiedFunction()
        {
+        func = pigContext.instantiateFuncFromAlias(funcName);
                try{
-                       ef = (EvalFunc) 
pigContext.instantiateFuncFromAlias(funcName);
-            Type javaType = ef.getReturnType();
-            log.debug("Type: " + javaType);
-            log.debug("funcName: " + funcName + " class name: " + 
ef.getClass().getName() + " return type: " + DataType.findType(javaType));
+            FunctionType.tryCasting(func, funcType);
                }catch (Exception e){
                        throw new ParseException(e.getMessage());
                }
                log.trace("Exiting EvalFunction");
                
-               return ef;
+               return func;
        }
 }
 

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Mon 
Dec  8 11:54:25 2008
@@ -32,9 +32,9 @@
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.test.utils.*;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,8 +44,9 @@
     private MiniCluster cluster = MiniCluster.buildCluster();
     private File tmpFile;
     
+    TupleFactory tf = TupleFactory.getInstance();
+
     public TestFilterUDF() throws ExecException, IOException{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         int LOOP_SIZE = 20;
         tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -57,11 +58,29 @@
     
     @Before
     public void setUp() throws Exception {
-        
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        int LOOP_SIZE = 20;
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            ps.println(i);
+        }
+        ps.close();
     }
 
     @After
     public void tearDown() throws Exception {
+        tmpFile.delete();
+    }
+    
+    private File createFile(String[] data) throws Exception{
+        File f = File.createTempFile("tmp", "");
+        PrintWriter pw = new PrintWriter(f);
+        for (int i=0; i<data.length; i++){
+            pw.println(data[i]);
+        }
+        pw.close();
+        return f;
     }
     
     static public class MyFilterFunction extends EvalFunc<Boolean>{
@@ -96,4 +115,38 @@
         }
         assertEquals(10, cnt);
     }
+
+    @Test
+    public void testFilterUDFusingDefine() throws Exception{
+        File inputFile= createFile(
+                    new String[]{ 
+                        "www.paulisageek.com\t4",
+                        "www.yahoo.com\t12344",
+                        "google.com\t1",
+                        "us2.amazon.com\t4141"
+                    }
+                );
+
+        File filterFile = createFile(
+                    new String[]{ 
+                        "12344"
+                    }
+                );
+
+        pigServer.registerQuery("define FILTER_CRITERION " + 
FILTERFROMFILE.class.getName() + "('" + 
FileLocalizer.hadoopify(Util.generateURI(filterFile.toString()), 
pigServer.getPigContext()) + "');");
+        pigServer.registerQuery("a = LOAD '" + 
Util.generateURI(inputFile.toString()) + "' as (url:chararray, 
numvisits:int);");
+        pigServer.registerQuery("b = filter a by 
FILTER_CRITERION(numvisits);");
+
+        Tuple expectedTuple = tf.newTuple();
+        expectedTuple.append(new String("www.yahoo.com"));
+        expectedTuple.append(new Integer("12344"));
+
+        Iterator<Tuple> iter = pigServer.openIterator("b");
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            assertTrue(t.equals(expectedTuple));
+        }
+
+    }
+        
 }

Modified: 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
(original)
+++ 
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
Mon Dec  8 11:54:25 2008
@@ -1839,7 +1839,44 @@
         "CONCAT(null, str);";
         buildPlan(query);
     }
+
+    @Test
+    public void testFilterUdfDefine() {
+        String query = "define isempty IsEmpty(); a = load 'a' as (x:int, 
y:double, str:chararray);" +
+                       "b = filter a by isempty(*);";
+        buildPlan(query);
+    }
+    
+    @Test
+    public void testLoadUdfDefine() {
+        String query = "define PS PigStorage(); a = load 'a' using PS as 
(x:int, y:double, str:chararray);" +
+                       "b = filter a by IsEmpty(*);";
+        buildPlan(query);
+    }
     
+    @Test
+    public void testLoadUdfConstructorArgDefine() {
+        String query = "define PS PigStorage(':'); a = load 'a' using PS as 
(x:int, y:double, str:chararray);" +
+                       "b = filter a by IsEmpty(*);";
+        buildPlan(query);
+    }
+    
+    @Test
+    public void testStoreUdfDefine() {
+        String query = "define PS PigStorage(); a = load 'a' using PS as 
(x:int, y:double, str:chararray);" +
+                       "b = filter a by IsEmpty(*);" +
+                "store b into 'x' using PS;" ;
+        buildPlan(query);
+    }
+    
+    @Test
+    public void testStoreUdfConstructorArgDefine() {
+        String query = "define PS PigStorage(':'); a = load 'a' using PS as 
(x:int, y:double, str:chararray);" +
+                       "b = filter a by IsEmpty(*);" +
+                "store b into 'x' using PS;" ;
+        buildPlan(query);
+    }
+
     private void printPlan(LogicalPlan lp) {
         LOPrinter graphPrinter = new LOPrinter(System.err, lp);
         System.err.println("Printing the logical plan");

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java?rev=724462&r1=724461&r2=724462&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java 
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestStreaming.java Mon 
Dec  8 11:54:25 2008
@@ -302,6 +302,77 @@
     }
 
     @Test
+    public void testInputShipSpecsWithUDFDefine() throws Exception {
+        // FIXME : this should be tested in all modes
+        if(execType == ExecType.LOCAL)
+            return;
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open 
\".$ARGV[0].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print STDOUT \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "}",
+                         };
+        File command1 = Util.createInputFile("script", "pl", script);
+        File command2 = Util.createInputFile("script", "pl", script);
+        
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults =
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+
+        // Pig query to run
+        
+        pigServer.registerQuery(
+                "define PS " + PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery(
+                "define CMD1 `" + command1.getName() + " foo` " +
+                "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
+                "input('foo' using PS ) " +
+                "output(stdout using PS ) " +
+                "stderr();"); 
+        pigServer.registerQuery(
+                "define CMD2 `" + command2.getName() + " bar` " +
+                "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
+                "input('bar' using PS ) " +
+                "output(stdout using PS ) " +        
+                "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using PS ;");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+                                       "through CMD1;");
+        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+        
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+
+    @Test
     public void testInputCacheSpecs() throws Exception {
         // Can't run this without HDFS
         if(execType == ExecType.LOCAL)
@@ -437,6 +508,68 @@
     }
 
     @Test
+       public void testOutputShipSpecsWithUDFDefine() throws Exception {
+        // FIXME : this should be tested in all modes
+        if(execType == ExecType.LOCAL)
+            return;
+           File input = Util.createInputFile("tmp", "", 
+                                             new String[] {"A,1", "B,2", 
"C,3", 
+                                                           "D,2", "A,5", 
"B,5", 
+                                                           "C,8", "A,8", 
"D,8", 
+                                                           "A,9"});
+
+           // Perl script 
+           String[] script = 
+               new String[] {
+                             "#!/usr/bin/perl",
+                          "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open 
\".$ARGV[1].\"!: $!\";",
+                          "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open 
\".$ARGV[2].\"!: $!\";",
+                          "while (<STDIN>) {",
+                          "  print OUTFILE \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "  print OUTFILE2 \"A,10\n\";",
+                          "}",
+                            };
+           File command = Util.createInputFile("script", "pl", script);
+
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "A", "A", "A", "A", "A"};
+        Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 
10};
+        Tuple[] expectedResults = 
+                
setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), 
Util.toDataByteArrays(expectedSecondFields));
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define PS " + PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery(
+                "define CMD `" + command.getName() + " foo bar` " +
+                "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+                       "output('foo' using PS, " +
+                       "'bar' using PS) " +
+                       "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using PS;");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");     
           
+        
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output+"/bar", 
+                                            pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, 
Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+    @Test
     public void testInputOutputSpecs() throws Exception {
         // FIXME : this should be tested in all modes
         if(execType == ExecType.LOCAL)

Added: 
hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=724462&view=auto
==============================================================================
--- 
hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java 
(added)
+++ 
hadoop/pig/branches/types/test/org/apache/pig/test/utils/FILTERFROMFILE.java 
Mon Dec  8 11:54:25 2008
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.File;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.lang.Boolean;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+
+/**
+ * 
+ * define MyFilterSet util.FILTERFROMFILE('/user/pig/filterfile');
+ * 
+ * A = load 'mydata' using PigStorage() as ( a, b );
+ * B = filter A by MyFilterSet(a);
+ * 
+ */
+public class FILTERFROMFILE extends FilterFunc{
+       private String FilterFileName = "";
+       
+       public FILTERFROMFILE(){ 
+    }
+       
+       public FILTERFROMFILE(String FilterFileName){
+               this.FilterFileName = FilterFileName;
+       }
+       
+       Map<String, Boolean> lookupTable = null;
+       
+       
+       private void init() throws IOException {
+           
+               lookupTable = new HashMap<String, Boolean>();
+               
+               Properties props = 
ConfigurationUtil.toProperties(PigInputFormat.sJob);
+               InputStream is = FileLocalizer.openDFSFile(FilterFileName, 
props);
+
+               BufferedReader reader = new BufferedReader(new 
InputStreamReader(is));
+               
+               while (true){
+                       String line = reader.readLine();
+
+                       if (line == null)
+                               break;
+
+                       String FilterField = line.split("\t")[0];
+                       
+                       lookupTable.put(FilterField, Boolean.TRUE);
+               }
+       }
+       
+       @Override
+       public Boolean exec(Tuple input) throws IOException {
+        if (lookupTable == null){
+               init();
+         }     
+       String s;
+        try {
+            s = input.get(0).toString();
+        } catch (ExecException e) {
+            IOException ioe = new IOException("Error getting data");
+            ioe.initCause(e);
+            throw ioe;
+        }
+               
+          boolean matched = lookupTable.containsKey(s);
+
+          return(matched);
+    }
+}


Reply via email to