Author: pradeepkth
Date: Fri Mar  6 22:41:15 2009
New Revision: 751117

URL: http://svn.apache.org/viewvc?rev=751117&view=rev
Log:
PIG-627: multiquery support M2 (hagleitn via pradeepkth) -additional checking 
of patch 'file_cmds-0305.patch'

Modified:
    hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
    
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java
    hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
    
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java

Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Fri Mar  6 
22:41:15 2009
@@ -191,12 +191,7 @@
             throw new IllegalStateException("setBatchOn() must be called 
first.");
         }
         
-        try {
-            currDAG.execute();
-        } finally {
-            log.info("Delete the current graph.");
-            currDAG = graphs.pop();
-        }
+        currDAG.execute();
     }
 
     /**
@@ -332,62 +327,13 @@
     }
  
     public LogicalPlan clonePlan(String alias) throws IOException {
-        // There are two choices on how we clone the logical plan
-        // 1 - we really clone each operator and connect up the cloned 
operators
-        // 2 - we cache away the script till the point we need to clone
-        // and then simply re-parse the script. 
-        // The latter approach is used here
-        // FIXME: There is one open issue with this now:
-        // Consider the following script:
-        // A = load 'file:/somefile';
-        // B = filter A by $0 > 10;
-        // store B into 'bla';
-        // rm 'file:/somefile';
-        // A = load 'file:/someotherfile'
-        // when we try to clone - we try to reparse
-        // from the beginning and currently the parser
-        // checks for file existence of files in the load
-        // in the case where the file is a local one -i.e. with file: prefix
-        // This will be a known issue now and we will need to revisit later
-        
-        // parse each line of the cached script and the
-        // final logical plan is the clone that we want
-        LogicalPlan lp = null;
-        int lineNumber = 1;
-        
-        // create data structures needed for parsing        
-        Graph graph = new Graph(true);
-        
-        for (Iterator<String> it = currDAG.getScriptCache().iterator(); 
it.hasNext(); lineNumber++) {
-            if (isBatchOn()) {
-                graph.registerQuery(it.next(), lineNumber);
-            } else {
-                lp = graph.parseQuery(it.next(), lineNumber);
-            }
-        }
-        
-        if(alias == null) {
-            // a store prompted the execution - so return
-            // the entire logical plan
-            if (isBatchOn()) {
-                lp = new LogicalPlan();
-                for (LogicalPlan lpPart : graph.getStoreOpTable().values()) {
-                    lp.mergeSharedPlan(lpPart);
-                }
-            }
+        Graph graph = currDAG.clone();
 
-            return lp;
-        } else {
-            // return the logical plan corresponding to the 
-            // alias supplied
-            LogicalOperator op = graph.getAliasOp().get(alias);
-            if(op == null) {
-                int errCode = 1003;
-                String msg = "Unable to find an operator for alias " + alias;
-                throw new FrontendException(msg, errCode, PigException.INPUT);
-            }
-            return graph.getAliases().get(op);
+        if (graph == null) {
+            throw new AssertionError("Re-parsing has failed");
         }
+
+        return graph.getPlan(alias);
     }
     
     public void registerQuery(String query) throws IOException {
@@ -435,20 +381,6 @@
      * result
      */
     public Iterator<Tuple> openIterator(String id) throws IOException {
-        if (isBatchOn()) {
-            log.info("Skip DUMP command in batch mode.");
-            return new Iterator<Tuple>() {
-                public boolean hasNext() {
-                    return false;
-                }
-                public Tuple next() {
-                    return null;
-                }
-                public void remove() {
-                }
-            };
-        }
-
         try {
             LogicalOperator op = currDAG.getAliasOp().get(id);
             if(null == op) {
@@ -913,14 +845,21 @@
         private String jobName;
         
         private boolean batchMode;
-      
+
+        private int processedStores;
+
+        private int ignoreNumStores;
+        
+        private LogicalPlan lp;
         
         Graph(boolean batchMode) { 
             this.batchMode = batchMode;
+            this.processedStores = 0;
+            this.ignoreNumStores = 0;
             this.jobName = "DefaultJobName";
+            this.lp = new LogicalPlan();
         };
         
-        
         Map<LogicalOperator, LogicalPlan> getAliases() { return aliases; }
         
         Map<OperatorKey, LogicalOperator> getOpTable() { return opTable; }
@@ -936,25 +875,42 @@
         void execute() throws ExecException, FrontendException {
             pigContext.getProperties().setProperty(PigContext.JOB_NAME, 
PigContext.JOB_NAME_PREFIX + ":" + jobName);
             PigServer.this.execute(null);
+            processedStores = storeOpTable.keySet().size();
         }
 
         void setJobName(String name) {
             jobName = name;
         }
 
+        LogicalPlan getPlan(String alias) throws IOException {
+            LogicalPlan plan = lp;
+                
+            if (alias != null) {
+                LogicalOperator op = aliasOp.get(alias);
+                if(op == null) {
+                    int errCode = 1003;
+                    String msg = "Unable to find an operator for alias " + 
alias;
+                    throw new FrontendException(msg, errCode, 
PigException.INPUT);
+                }
+                plan = aliases.get(op);
+            }
+            return plan;
+        }
+
         void registerQuery(String query, int startLine) throws IOException {
             
-            LogicalPlan lp = parseQuery(query, startLine);
+            LogicalPlan tmpLp = parseQuery(query, startLine);
             
             // store away the query for use in cloning later
             scriptCache.add(query);
-            if (lp.getLeaves().size() == 1) {
-                LogicalOperator op = lp.getSingleLeafPlanOutputOp();
-
+            if (tmpLp.getLeaves().size() == 1) {
+                LogicalOperator op = tmpLp.getSingleLeafPlanOutputOp();
+                
                 // Check if we just processed a LOStore i.e. STORE
                 if (op instanceof LOStore) {
 
                     if (!batchMode) {
+                        lp = tmpLp;
                         try {
                             execute();
                         } catch (Exception e) {
@@ -965,9 +921,13 @@
                                     PigException.INPUT, e);
                         }
                     } else {
-                        storeOpTable.put(op, lp);
+                        if (0 == ignoreNumStores) {
+                            storeOpTable.put(op, tmpLp);
+                            lp.mergeSharedPlan(tmpLp);
+                        } else {
+                            --ignoreNumStores;
+                        }
                     }
-
                 }
             }
         }        
@@ -991,5 +951,46 @@
                 throw new FrontendException(msg, errCode, PigException.INPUT, 
false, null, e);
             }
         }
+
+        protected Graph clone() {
+            // There are two choices on how we clone the logical plan
+            // 1 - we really clone each operator and connect up the cloned 
operators
+            // 2 - we cache away the script till the point we need to clone
+            // and then simply re-parse the script. 
+            // The latter approach is used here
+            // FIXME: There is one open issue with this now:
+            // Consider the following script:
+            // A = load 'file:/somefile';
+            // B = filter A by $0 > 10;
+            // store B into 'bla';
+            // rm 'file:/somefile';
+            // A = load 'file:/someotherfile'
+            // when we try to clone - we try to reparse
+            // from the beginning and currently the parser
+            // checks for file existence of files in the load
+            // in the case where the file is a local one -i.e. with file: 
prefix
+            // This will be a known issue now and we will need to revisit later
+            
+            // parse each line of the cached script
+            int lineNumber = 1;
+            
+            // create data structures needed for parsing        
+            Graph graph = new Graph(true);
+            graph.ignoreNumStores = processedStores;
+            graph.processedStores = processedStores;
+            
+            try {
+                for (Iterator<String> it = getScriptCache().iterator(); 
it.hasNext(); lineNumber++) {
+                    if (isBatchOn()) {
+                        graph.registerQuery(it.next(), lineNumber);
+                    } else {
+                        graph.lp = graph.parseQuery(it.next(), lineNumber);
+                    }
+                }
+            } catch (IOException ioe) {
+                graph = null;
+            }
+            return graph;
+        }
     }
 }

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Fri Mar  6 22:41:15 2009
@@ -72,11 +72,13 @@
         List<Job> failedJobs = new LinkedList<Job>();
         List<Job> succJobs = new LinkedList<Job>();
         JobControl jc;
-        int numMRJobs = mrp.size();
+        int totalMRJobs = mrp.size();
+        int numMRJobsCompl = 0;
+        int numMRJobsCurrent = 0;
         double lastProg = -1;
 
         while((jc = jcc.compile(mrp, grpName)) != null) {
-            numMRJobs += jc.getWaitingJobs().size();
+            numMRJobsCurrent = jc.getWaitingJobs().size();
 
             new Thread(jc).start();
             
@@ -84,7 +86,7 @@
                 try {
                     Thread.sleep(sleepTime);
                 } catch (InterruptedException e) {}
-                double prog = calculateProgress(jc, jobClient)/numMRJobs;
+                double prog = (numMRJobsCompl+calculateProgress(jc, 
jobClient))/totalMRJobs;
                 if(prog>=(lastProg+0.01)){
                     int perCom = (int)(prog * 100);
                     if(perCom!=100)
@@ -92,6 +94,7 @@
                 }
                 lastProg = prog;
             }
+            numMRJobsCompl += numMRJobsCurrent;
             failedJobs.addAll(jc.getFailedJobs());
             succJobs.addAll(jc.getSuccessfulJobs());
             jcc.moveResults();

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java 
(original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/grunt/GruntParser.java 
Fri Mar  6 22:41:15 2009
@@ -90,6 +90,23 @@
 
     private void init() {
         mDone = false;
+        mLoadOnly = false;
+    }
+
+    private void setBatchOn() {
+        mPigServer.setBatchOn();
+    }
+
+    private void executeBatch() throws IOException {
+        if (mPigServer.isBatchOn() && !mLoadOnly) {
+            mPigServer.executeBatch();
+        }
+    }
+
+    private void discardBatch() throws IOException {
+        if (mPigServer.isBatchOn()) {
+            mPigServer.discardBatch();
+        }
     }
     
     /** 
@@ -106,7 +123,7 @@
         }
 
         if (!mInteractive) {
-            mPigServer.setBatchOn();
+            setBatchOn();
         }
 
         try {
@@ -115,21 +132,17 @@
             while(!mDone) {
                 parse();
             }
-        } catch(IOException e) {
-            if (!mInteractive) {
-                mPigServer.discardBatch();
-            }
-            throw e;
-        } catch (ParseException e) {
-            if (!mInteractive) {
-                mPigServer.discardBatch();
-            }
-            throw e;
+            
+            executeBatch();
+        } 
+        finally {
+            discardBatch();
         }
+    }
 
-        if (!mInteractive) {
-            mPigServer.executeBatch();
-        }
+    public void setLoadOnly(boolean loadOnly) 
+    {
+        mLoadOnly = loadOnly;
     }
 
     public void setParams(PigServer pigServer)
@@ -183,14 +196,14 @@
         PrintStream out = System.out;
 
         if (script != null) {
-            mPigServer.setBatchOn();
+            setBatchOn();
             try {
-                loadScript(script, true, params, files);
+                loadScript(script, true, true, params, files);
             } catch(IOException e) {
-                mPigServer.discardBatch();
+                discardBatch();
                 throw e;
             } catch (ParseException e) {
-                mPigServer.discardBatch();
+                discardBatch();
                 throw e;
             }
         }
@@ -215,7 +228,7 @@
         }
         mPigServer.explain(alias, format, isVerbose, out, out, out);
         if (script != null) {
-            mPigServer.discardBatch();
+            discardBatch();
         }
     }
     
@@ -246,25 +259,26 @@
                                  List<String> params, List<String> files) 
         throws IOException, ParseException {
         
+        if (script == null) {
+            executeBatch();
+            return;
+        }
+        
         if (batch) {
-            mPigServer.setBatchOn();
+            setBatchOn();
             try {
-                loadScript(script, true, params, files);
-            } catch (IOException e) {
-                mPigServer.discardBatch();
-                throw e;
-            } catch (ParseException e) {
-                mPigServer.discardBatch();
-                throw e;
+                loadScript(script, true, mLoadOnly, params, files);
+                executeBatch();
+            } finally {
+                discardBatch();
             }
-            mPigServer.executeBatch();
         } else {
-            loadScript(script, false, params, files);
+            loadScript(script, false, mLoadOnly, params, files);
         }
     }
 
-    private void loadScript(String script, boolean batch, 
-                                 List<String> params, List<String> files) 
+    private void loadScript(String script, boolean batch, boolean loadOnly,
+                            List<String> params, List<String> files) 
         throws IOException, ParseException {
         
         Reader inputReader;
@@ -299,6 +313,7 @@
         parser.setParams(mPigServer);
         parser.setConsoleReader(reader);
         parser.setInteractive(interactive);
+        parser.setLoadOnly(loadOnly);
         
         parser.prompt();
         while(!parser.isDone()) {
@@ -344,6 +359,8 @@
     
     protected void processCat(String path) throws IOException
     {
+        executeBatch();
+
         try {
             byte buffer[] = new byte[65536];
             ElementDescriptor dfsPath = mDfs.asElement(path);
@@ -518,6 +535,8 @@
 
     protected void processMove(String src, String dst) throws IOException
     {
+        executeBatch();
+
         try {
             ElementDescriptor srcPath = mDfs.asElement(src);
             ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -535,6 +554,8 @@
     
     protected void processCopy(String src, String dst) throws IOException
     {
+        executeBatch();
+
         try {
             ElementDescriptor srcPath = mDfs.asElement(src);
             ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -548,6 +569,8 @@
     
     protected void processCopyToLocal(String src, String dst) throws 
IOException
     {
+        executeBatch();
+
         try {
             ElementDescriptor srcPath = mDfs.asElement(src);
             ElementDescriptor dstPath = mLfs.asElement(dst);
@@ -561,6 +584,8 @@
 
     protected void processCopyFromLocal(String src, String dst) throws 
IOException
     {
+        executeBatch();
+
         try {
             ElementDescriptor srcPath = mLfs.asElement(src);
             ElementDescriptor dstPath = mDfs.asElement(dst);
@@ -596,6 +621,8 @@
     protected void processRemove(String path, String options ) throws 
IOException
     {
         ElementDescriptor dfsPath = mDfs.asElement(path);
+
+        executeBatch();
         
         if (!dfsPath.exists()) {
             if (options == null || !options.equalsIgnoreCase("force")) {
@@ -614,5 +641,6 @@
     private Properties mConf;
     private JobClient mJobClient;
     private boolean mDone;
+    private boolean mLoadOnly;
 
 }

Modified: 
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
 (original)
+++ 
hadoop/pig/branches/multiquery/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
 Fri Mar  6 22:41:15 2009
@@ -530,7 +530,7 @@
 void Script() throws IOException:
 {
     Token t;
-    String script;
+    String script = null;
     boolean batch = false;
     ArrayList<String> params;
     ArrayList<String> files;
@@ -556,8 +556,10 @@
                t = GetPath()
                {files.add(t.image);}
        )*
-       t = GetPath()
-       {script = t.image;}
+       (
+               t = GetPath()
+               {script = t.image;}
+       )?
        {processScript(script, batch, params, files);}
 }
 

Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java 
(original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestGrunt.java Fri 
Mar  6 22:41:15 2009
@@ -471,4 +471,50 @@
     
         grunt.exec();
     }
+
+    @Test
+    public void testPartialExecution() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        PigContext context = server.getPigContext();
+        
+        String strCmd = "rmf bar; rmf baz; a = load 
'file:test/org/apache/pig/test/data/passwd';"
+            +"store a into 'bar'; exec; a = load 'bar'; store a into 'baz';\n";
+        
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+        
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+    
+        grunt.exec();
+    }
+
+    @Test
+    public void testFileCmds() throws Throwable {
+        PigServer server = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
+        PigContext context = server.getPigContext();
+        
+        String strCmd = 
+            "rmf bar; rmf baz;"
+            +"a = load 'file:test/org/apache/pig/test/data/passwd';"
+            +"store a into 'bar';"
+            +"cp bar baz;"
+            +"rm bar; rm baz;"
+            +"store a into 'baz';"
+            +"store a into 'bar';"
+            +"rm baz; rm bar;"
+            +"store a into 'baz';"
+            +"mv baz bar;"
+            +"b = load 'bar';"
+            +"store b into 'baz';"
+            +"cat baz;"
+            +"rm baz;"
+            +"rm bar;\n";
+        
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+        
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+    
+        grunt.exec();
+    }
 }

Modified: 
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java 
(original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java 
Fri Mar  6 22:41:15 2009
@@ -94,6 +94,22 @@
     }
 
     @Test
+    public void testEmptyExecute() {
+        System.out.println("=== test empty execute ===");
+        
+        try {
+            myPig.setBatchOn();
+            myPig.executeBatch();
+            myPig.executeBatch();
+            myPig.discardBatch();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+        
+    @Test
     public void testMultiQueryWithTwoStores2() {
 
         System.out.println("===== test multi-query with 2 stores (2) =====");
@@ -119,6 +135,34 @@
     }
 
     @Test
+    public void testMultiQueryWithTwoStores2Execs() {
+
+        System.out.println("===== test multi-query with 2 stores (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.executeBatch();
+            myPig.registerQuery("store b into '/tmp/output1';");
+            myPig.executeBatch();
+            myPig.registerQuery("c = group b by gid;");
+            myPig.registerQuery("store c into '/tmp/output2';");
+
+            myPig.executeBatch();
+            myPig.discardBatch();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            deleteOutputFiles();
+        }
+    }
+
+    @Test
     public void testMultiQueryWithThreeStores() {
 
         System.out.println("===== test multi-query with 3 stores =====");
@@ -169,6 +213,7 @@
             myPig.registerQuery("store d into '/tmp/output3';");
 
             myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -233,6 +278,7 @@
             myPig.registerQuery("store e into '/tmp/output3';");
 
             myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -283,6 +329,7 @@
             myPig.registerQuery("group b by gid;");
 
             myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();

Modified: 
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=751117&r1=751116&r2=751117&view=diff
==============================================================================
--- 
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
 (original)
+++ 
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQueryLocal.java
 Fri Mar  6 22:41:15 2009
@@ -71,7 +71,7 @@
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
             myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
@@ -81,9 +81,6 @@
             // XXX Physical plan has one less node in the local case
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 10);
 
-            // XXX MR plan doesn't seem to work in the local case
-            checkMRPlan(pp, 2, 2, 2);
-
             Assert.assertTrue(executePlan(pp));
 
         } catch (Exception e) {
@@ -95,6 +92,22 @@
     }
 
     @Test
+    public void testEmptyExecute() {
+        System.out.println("=== test empty execute ===");
+        
+        try {
+            myPig.setBatchOn();
+            myPig.executeBatch();
+            myPig.executeBatch();
+            myPig.discardBatch();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+
+    @Test
     public void testMultiQueryWithTwoStores2() {
 
         System.out.println("===== test multi-query with 2 stores (2) =====");
@@ -104,12 +117,40 @@
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.registerQuery("store b into '/tmp/output1';");
+            myPig.registerQuery("c = group b by gid;");
+            myPig.registerQuery("store c into '/tmp/output2';");
+
+            myPig.executeBatch();
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            deleteOutputFiles();
+        }
+    }
+
+    @Test
+    public void testMultiQueryWithTwoStores2Execs() {
+
+        System.out.println("===== test multi-query with 2 stores (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
+                                "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
+            myPig.registerQuery("b = filter a by uid > 5;");
+            myPig.executeBatch();
             myPig.registerQuery("store b into '/tmp/output1';");
+            myPig.executeBatch();
             myPig.registerQuery("c = group b by gid;");
             myPig.registerQuery("store c into '/tmp/output2';");
 
             myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -129,20 +170,17 @@
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.registerQuery("c = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output2';");
-            myPig.registerQuery("d = filter c by uid > 1500;");
+            myPig.registerQuery("d = filter c by uid > 15;");
             myPig.registerQuery("store d into '/tmp/output3';");
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 14);
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
 
-            // XXX MR plan doesn't seem to work in the local case
-            checkMRPlan(pp, 3, 3, 3);
-
             Assert.assertTrue(executePlan(pp));
 
         } catch (Exception e) {
@@ -163,14 +201,15 @@
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("store b into '/tmp/output1';");
-            myPig.registerQuery("c = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output2';");
-            myPig.registerQuery("d = filter c by uid > 1500;");
+            myPig.registerQuery("d = filter c by uid > 15;");
             myPig.registerQuery("store d into '/tmp/output3';");
 
             myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -192,8 +231,8 @@
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
             myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd2' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("c = filter a by uid > 500;");
-            myPig.registerQuery("d = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("d = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output1';");
             myPig.registerQuery("store d into '/tmp/output2';");
             myPig.registerQuery("e = cogroup c by uid, d by uid;");
@@ -204,9 +243,6 @@
             // XXX the total number of ops is one less in the local case
             PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 18);
 
-            // XXX MR plan doesn't seem to work in the local case
-            checkMRPlan(pp, 4, 4, 4);
-
             Assert.assertTrue(executePlan(pp));
 
         } catch (Exception e) {
@@ -229,14 +265,15 @@
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
             myPig.registerQuery("b = load 
'file:test/org/apache/pig/test/data/passwd2' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("c = filter a by uid > 500;");
-            myPig.registerQuery("d = filter b by uid > 1000;");
+            myPig.registerQuery("c = filter a by uid > 5;");
+            myPig.registerQuery("d = filter b by uid > 10;");
             myPig.registerQuery("store c into '/tmp/output1';");
             myPig.registerQuery("store d into '/tmp/output2';");
             myPig.registerQuery("e = cogroup c by uid, d by uid;");
             myPig.registerQuery("store e into '/tmp/output3';");
 
             myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -256,7 +293,7 @@
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("group b by gid;");
 
             LogicalPlan lp = checkLogicalPlan(0, 0, 0);
@@ -264,8 +301,6 @@
             // XXX Physical plan has one less node in the local case
             PhysicalPlan pp = checkPhysicalPlan(lp, 0, 0, 0);
 
-            //checkMRPlan(pp, 0, 0, 0);
-
             //Assert.assertTrue(executePlan(pp));
 
         } catch (Exception e) {
@@ -284,10 +319,11 @@
 
             myPig.registerQuery("a = load 
'file:test/org/apache/pig/test/data/passwd' " +
                                 "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);");
-            myPig.registerQuery("b = filter a by uid > 500;");
+            myPig.registerQuery("b = filter a by uid > 5;");
             myPig.registerQuery("group b by gid;");
 
             myPig.executeBatch();
+            myPig.discardBatch();
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -304,7 +340,7 @@
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "explain b;"
                           + "store b into '/tmp/output1';\n";
             
@@ -329,7 +365,7 @@
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "dump b;"
                           + "store b into '/tmp/output1';\n";
             
@@ -354,7 +390,7 @@
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "describe b;"
                           + "store b into '/tmp/output1';\n";
             
@@ -379,7 +415,7 @@
         try {
             String script = "a = load 
'file:test/org/apache/pig/test/data/passwd' "
                           + "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int,gid:int);"
-                          + "b = filter a by uid > 500;"
+                          + "b = filter a by uid > 5;"
                           + "illustrate b;"
                           + "store b into '/tmp/output1';\n";
             
@@ -467,25 +503,6 @@
         return pp;
     }
 
-    private MROperPlan checkMRPlan(PhysicalPlan pp, int expectedRoots,
-            int expectedLeaves, int expectedSize) throws IOException {
-
-        System.out.println("===== check map-reduce plan =====");
-
-        ExecTools.checkLeafIsStore(pp, myPig.getPigContext());
-        
-        MRCompiler mrcomp = new MRCompiler(pp, myPig.getPigContext());
-        MROperPlan mrp = mrcomp.compile();
-
-        Assert.assertEquals(expectedRoots, mrp.getRoots().size());
-        Assert.assertEquals(expectedLeaves, mrp.getLeaves().size());
-        Assert.assertEquals(expectedSize, mrp.size());
-
-        showPlanOperators(mrp);
-
-        return mrp;
-    }
-
     private boolean executePlan(PhysicalPlan pp) throws IOException {
         FileLocalizer.clearDeleteOnFail();
         ExecJob job = myPig.getPigContext().getExecutionEngine().execute(pp, 
"execute");


Reply via email to