Author: olga
Date: Fri Sep 12 11:56:30 2008
New Revision: 694774

URL: http://svn.apache.org/viewvc?rev=694774&view=rev
Log:
Code to clone operators

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Sep 12 11:56:30 2008
@@ -190,3 +190,4 @@
 
     PIG-422: cross is broken (shravanmn via olgan)
 
+    PIG-407: need to clone operators (pradeepk vi olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Sep 12 
11:56:30 2008
@@ -57,7 +57,9 @@
 import 
org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.impl.util.PropertiesUtil;
@@ -94,6 +96,7 @@
     PigContext pigContext;
     
     private String scope = constructScope();
+    private ArrayList<String> cachedScript = new ArrayList<String>();
     
     private String constructScope() {
         // scope servers for now as a session id
@@ -251,27 +254,14 @@
      * @throws IOException
      */    
     public void registerQuery(String query, int startLine) throws IOException {
-        // Bugzilla Bug 1006706 -- ignore empty queries
-        //=============================================
-        if(query != null) {
-            query = query.trim();
-            if(query.length() == 0) return;
-        }else {
-            return;
-        }
             
-        LogicalPlan lp = null;
-        LogicalOperator op = null;
-        try {
-            lp = (new LogicalPlanBuilder(pigContext).parse(scope, query,
-                    aliases, opTable, aliasOp, startLine));
-        } catch (ParseException e) {
-            throw (IOException) new IOException(e.getMessage()).initCause(e);
-        }
+        LogicalPlan lp = parseQuery(query, startLine, aliases, opTable, 
aliasOp);
+        // store away the query for use in cloning later
+        cachedScript .add(query);
         
         if (lp.getLeaves().size() == 1)
         {
-            op = lp.getSingleLeafPlanOutputOp();
+            LogicalOperator op = lp.getSingleLeafPlanOutputOp();
             // No need to do anything about DEFINE 
             if (op instanceof LODefine) {
                 return;
@@ -280,14 +270,76 @@
             // Check if we just processed a LOStore i.e. STORE
             if (op instanceof LOStore) {
                 try{
-                    execute(lp);
+                    execute(null);
                 } catch (Exception e) {
                     throw WrappedIOException.wrap("Unable to store for alias: 
" + op.getOperatorKey().getId(), e);
                 }
             }
         }
     }
+    
+    private LogicalPlan parseQuery(String query, int startLine, 
Map<LogicalOperator, LogicalPlan> aliasesMap, 
+            Map<OperatorKey, LogicalOperator> opTableMap, Map<String, 
LogicalOperator> aliasOpMap) throws IOException {
+        if(query != null) {
+            query = query.trim();
+            if(query.length() == 0) return null;
+        }else {
+            return null;
+        }
+        try {
+            return new LogicalPlanBuilder(pigContext).parse(scope, query,
+                    aliasesMap, opTableMap, aliasOpMap, startLine);
+        } catch (ParseException e) {
+            throw (IOException) new IOException(e.getMessage()).initCause(e);
+        }
+    }
 
+    public LogicalPlan clonePlan(String alias) throws IOException {
+        // There are two choices on how we clone the logical plan
+        // 1 - we really clone each operator and connect up the cloned 
operators
+        // 2 - we cache away the script till the point we need to clone
+        // and then simply re-parse the script. 
+        // The latter approach is used here
+        // FIXME: There is one open issue with this now:
+        // Consider the following script:
+        // A = load 'file:/somefile';
+        // B = filter A by $0 > 10;
+        // store B into 'bla';
+        // rm 'file:/somefile';
+        // A = load 'file:/someotherfile'
+        // when we try to clone - we try to reparse
+        // from the beginning and currently the parser
+        // checks for file existence of files in the load
+        // in the case where the file is a local one -i.e. with file: prefix
+        // This will be a known issue now and we will need to revisit later
+        
+        // parse each line of the cached script and the
+        // final logical plan is the clone that we want
+        LogicalPlan lp = null;
+        int lineNumber = 1;
+        // create data structures needed for parsing
+        Map<LogicalOperator, LogicalPlan> cloneAliases = new 
HashMap<LogicalOperator, LogicalPlan>();
+        Map<OperatorKey, LogicalOperator> cloneOpTable = new 
HashMap<OperatorKey, LogicalOperator>();
+        Map<String, LogicalOperator> cloneAliasOp = new HashMap<String, 
LogicalOperator>();
+        for (Iterator<String> it = cachedScript.iterator(); it.hasNext(); 
lineNumber++) {
+            lp = parseQuery(it.next(), lineNumber, cloneAliases, cloneOpTable, 
cloneAliasOp);
+        }
+        
+        if(alias == null) {
+            // a store prompted the execution - so return
+            // the entire logical plan
+            return lp;
+        } else {
+            // return the logical plan corresponding to the 
+            // alias supplied
+            LogicalOperator op = cloneAliasOp.get(alias);
+            if(op == null) {
+                throw new IOException("Unable to find an operator for alias " 
+ alias);
+            }
+            return cloneAliases.get(op);
+        }
+    }
+    
     public void registerQuery(String query) throws IOException {
         registerQuery(query, 1);
     }
@@ -296,7 +348,7 @@
         try {
             LogicalPlan lp = getPlanFromAlias(alias, "describe");
             try {
-                lp = compileLp(lp, "describe", false);
+                lp = compileLp(alias, false);
             } catch (ExecException e) {
                 throw new FrontendException(e.getMessage());
             }
@@ -373,9 +425,26 @@
             String filename,
             String func) throws IOException {
         try {
-            LogicalPlan storePlan = QueryParser.generateStorePlan(opTable,
-                scope, readFrom, filename, func, aliasOp.get(id), aliases);
-            return execute(storePlan);
+            LogicalPlan lp = compileLp(id);
+            
+            // MRCompiler needs a store to be the leaf - hence
+            // add a store to the plan to explain
+            
+            // figure out the leaf to which the store needs to be added
+            List<LogicalOperator> leaves = lp.getLeaves();
+            LogicalOperator leaf = null;
+            if(leaves.size() == 1) {
+                leaf = leaves.get(0);
+            } else {
+                for (Iterator<LogicalOperator> it = leaves.iterator(); 
it.hasNext();) {
+                    LogicalOperator leafOp = it.next();
+                    if(leafOp.getAlias().equals(id))
+                        leaf = leafOp;
+                }
+            }
+            
+            LogicalPlan storePlan = QueryParser.generateStorePlan(scope, lp, 
filename, func, leaf);
+            return executeCompiledLogicalPlan(storePlan);
         } catch (Exception e) {
             throw WrappedIOException.wrap("Unable to store for alias: " +
                 id, e);
@@ -394,20 +463,27 @@
     public void explain(String alias,
                         PrintStream stream) throws IOException {
         try {
-            LogicalOperator op = aliasOp.get(alias);
-            if(null == op) {
-                throw new IOException("Unable to find an operator for alias " 
+ alias);
+            LogicalPlan lp = compileLp(alias);
+            
+            // MRCompiler needs a store to be the leaf - hence
+            // add a store to the plan to explain
+            
+            // figure out the leaf to which the store needs to be added
+            List<LogicalOperator> leaves = lp.getLeaves();
+            LogicalOperator leaf = null;
+            for (Iterator<LogicalOperator> it = leaves.iterator(); 
it.hasNext();) {
+                LogicalOperator leafOp = it.next();
+                if(leafOp.getAlias().equals(alias))
+                    leaf = leafOp;
             }
-            LogicalPlan storePlan = QueryParser.generateStorePlan(opTable,
-                scope, getPlanFromAlias(alias, op.getClass().getName()),
-                "fakefile", PigStorage.class.getName(), aliasOp.get(alias),
-                aliases);
-            LogicalPlan lp = compileLp(storePlan, "explain");
+            
+            LogicalPlan storePlan = QueryParser.generateStorePlan(
+                scope, lp, "fakefile", PigStorage.class.getName(), leaf);
             stream.println("Logical Plan:");
-            LOPrinter lv = new LOPrinter(stream, lp);
+            LOPrinter lv = new LOPrinter(stream, storePlan);
             lv.visit();
 
-            PhysicalPlan pp = compilePp(lp);
+            PhysicalPlan pp = compilePp(storePlan);
             stream.println("-----------------------------------------------");
             stream.println("Physical Plan:");
 
@@ -530,13 +606,18 @@
         // pigContext.getExecutionEngine().reclaimScope(this.scope);
     }
 
-    private ExecJob execute(
-            LogicalPlan lp) throws FrontendException, ExecException {
+    private ExecJob execute(String alias) throws FrontendException, 
ExecException {
         ExecJob job = null;
 //        lp.explain(System.out, System.err);
-        LogicalPlan typeCheckedLp = compileLp(lp, "execute");
+        LogicalPlan typeCheckedLp = compileLp(alias);
+        
+        return executeCompiledLogicalPlan(typeCheckedLp);
 //        typeCheckedLp.explain(System.out, System.err);
-        PhysicalPlan pp = compilePp(typeCheckedLp);
+        
+    }
+    
+    private ExecJob executeCompiledLogicalPlan(LogicalPlan compiledLp) throws 
ExecException {
+        PhysicalPlan pp = compilePp(compiledLp);
         // execute using appropriate engine
         FileLocalizer.clearDeleteOnFail();
         ExecJob execJob = pigContext.getExecutionEngine().execute(pp, 
"execute");
@@ -546,24 +627,26 @@
     }
 
     private LogicalPlan compileLp(
-            LogicalPlan lp,
-            String operation) throws ExecException, FrontendException {
-        return compileLp(lp, operation, true);
+            String alias) throws ExecException, FrontendException {
+        return compileLp(alias, true);
     }
 
     private LogicalPlan compileLp(
-            LogicalPlan lp,
-            String operation,
+            String alias,
             boolean optimize) throws ExecException, FrontendException {
-        // Look up the logical plan in the aliases map.  That plan will be
-        // properly connected to all the others.
-
-        if(null == lp) {
-            throw new FrontendException("Cannot operate on null logical plan");
+        
+        // create a clone of the logical plan and give it
+        // to the operations below
+        LogicalPlan lpClone;
+        try {
+             lpClone = clonePlan(alias);
+        } catch (IOException e) {
+            throw new FrontendException("Unable to clone plan before 
compiling", e);
         }
 
+        
         // Set the logical plan values correctly in all the operators
-        PlanSetter ps = new PlanSetter(lp);
+        PlanSetter ps = new PlanSetter(lpClone);
         ps.visit();
         
         //(new SplitIntroducer(lp)).introduceImplSplits();
@@ -573,8 +656,8 @@
         FrontendException caught = null;
         try {
             LogicalPlanValidationExecutor validator = 
-                new LogicalPlanValidationExecutor(lp, pigContext);
-            validator.validate(lp, collector);
+                new LogicalPlanValidationExecutor(lpClone, pigContext);
+            validator.validate(lpClone, collector);
         } catch (FrontendException fe) {
             // Need to go through and see what the collector has in it.  But
             // remember what we've caught so we can wrap it into what we
@@ -612,11 +695,11 @@
 
         // optimize
         if (optimize) {
-            LogicalOptimizer optimizer = new LogicalOptimizer(lp);
+            LogicalOptimizer optimizer = new LogicalOptimizer(lpClone);
             optimizer.optimize();
         }
 
-        return lp;
+        return lpClone;
     }
 
     private PhysicalPlan compilePp(LogicalPlan lp) throws ExecException {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
 Fri Sep 12 11:56:30 2008
@@ -110,13 +110,11 @@
             return str;
     }
 
-    public static LogicalPlan generateStorePlan(Map<OperatorKey, 
LogicalOperator> opTable,
-                                                                               
    String scope,
+    public static LogicalPlan generateStorePlan(String scope,
                                                 LogicalPlan readFrom,
                                                 String fileName,
                                                 String func,
-                                                LogicalOperator input,
-                                                Map<LogicalOperator, 
LogicalPlan> aliases) throws FrontendException {
+                                                LogicalOperator input) throws 
FrontendException {
 
         if (func == null) {
             func = PigStorage.class.getName();
@@ -146,8 +144,6 @@
             throw new FrontendException(pe.getMessage());
         }
            
-        aliases.put(store, storePlan);
-
         if (storePlan.getRoots().size() == 0) throw new 
RuntimeException("Store plan has no roots!");
         return storePlan;
     }
@@ -298,6 +294,7 @@
                LogicalOperator splitOutput = new LOSplitOutput(lp, new 
OperatorKey(scope, getNextId()), index, condPlan);
                splitOp.addOutput(splitOutput);
                addAlias(alias, splitOutput);
+        splitOutput.setAlias(alias);
         addLogicalPlan(splitOutput, lp);
                
                lp.add(splitOutput);

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=694774&r1=694773&r2=694774&view=diff
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java 
(original)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java 
Fri Sep 12 11:56:30 2008
@@ -53,9 +53,9 @@
     MiniCluster cluster = MiniCluster.buildCluster();
     @Test
     public void testGroupCountWithMultipleFields() throws Throwable {
+        File tmpFile = File.createTempFile("test", "txt");
         for (int k = 0; k < nullFlags.length; k++) {
             System.err.println("Running testGroupCountWithMultipleFields with 
nullFlags set to " + nullFlags[k]);
-            File tmpFile = File.createTempFile("test", "txt");
             // flag to indicate if both the keys forming
             // the group key are null
             int groupKeyWithNulls = 0;
@@ -98,7 +98,6 @@
             pig.registerQuery(" a = group (load 'file:" + tmpFile + "') by 
($0,$1);");
             pig.registerQuery("b = foreach a generate flatten(group), 
SUM($1.$2);");
             Iterator<Tuple> it = pig.openIterator("b");
-            tmpFile.delete();
             int count = 0;
             System.err.println("XX Starting");
             while(it.hasNext()){
@@ -123,6 +122,7 @@
                 assertEquals("Running testGroupCountWithMultipleFields with 
nullFlags set to " + nullFlags[k], LOOP_COUNT - groupKeyWithNulls + 1, count);
             
         }
+        tmpFile.delete();
         
     }
     


Reply via email to