Author: hashutosh
Date: Fri Feb 26 19:33:27 2010
New Revision: 916793

URL: http://svn.apache.org/viewvc?rev=916793&view=rev
Log:
PIG-1251: Move SortInfo calculation earlier in compilation 

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
    
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Feb 26 19:33:27 2010
@@ -58,6 +58,8 @@
 
 IMPROVEMENTS
 
+PIG-1251: Move SortInfo calculation earlier in compilation (ashutoshc)
+
 PIG-1233: NullPointerException in AVG  (ankur via olgan)
 
 PIG-1218: Use distributed cache to store samples (rding via pradeepkth)

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Feb 26 19:33:27 2010
@@ -54,7 +54,13 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -66,12 +72,15 @@
 import 
org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
-import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.logicalLayer.LODefine;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.pen.ExampleGenerator;
@@ -837,6 +846,9 @@
         PlanSetter ps = new PlanSetter(lpClone);
         ps.visit();
         
+        SortInfoSetter sortInfoSetter = new SortInfoSetter(lpClone);
+        sortInfoSetter.visit();
+        
         // run through validator
         CompilationMessageCollector collector = new 
CompilationMessageCollector() ;
         FrontendException caught = null;
@@ -909,6 +921,56 @@
         }        
         return lp;
     }
+    
+    public static class SortInfoSetter extends LOVisitor{
+
+        public SortInfoSetter(LogicalPlan plan) {
+            super(plan, new DependencyOrderWalker<LogicalOperator, 
LogicalPlan>(plan));
+        }
+
+        @Override
+        protected void visit(LOStore store) throws VisitorException {
+            
+            LogicalOperator storePred = 
store.getPlan().getPredecessors(store).get(0);
+            if(storePred == null){
+                int errCode = 2051;
+                String msg = "Did not find a predecessor for Store." ;
+                throw new VisitorException(msg, errCode, PigException.BUG);    
+            }
+            
+            SortInfo sortInfo = null;
+            if(storePred instanceof LOLimit) {
+                storePred = store.getPlan().getPredecessors(storePred).get(0);
+            } else if (storePred instanceof LOSplitOutput) {
+                LOSplitOutput splitOutput = (LOSplitOutput)storePred;
+                // We assume this is the LOSplitOutput we injected for this 
case:
+                // b = order a by $0; store b into '1'; store b into '2';
+                // In this case, we should mark both '1' and '2' as sorted
+                LogicalPlan conditionPlan = splitOutput.getConditionPlan();
+                if (conditionPlan.getRoots().size()==1) {
+                    LogicalOperator root = conditionPlan.getRoots().get(0);
+                    if (root instanceof LOConst) {
+                        Object value = ((LOConst)root).getValue();
+                        if (value instanceof Boolean && (Boolean)value==true) {
+                            LogicalOperator split = 
splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+                            if (split instanceof LOSplit)
+                                storePred = 
store.getPlan().getPredecessors(split).get(0);
+                        }
+                    }
+                }
+            }
+            // if this predecessor is a sort, get
+            // the sort info.
+            if(storePred instanceof LOSort) {
+                try {
+                    sortInfo = ((LOSort)storePred).getSortInfo();
+                } catch (FrontendException e) {
+                    throw new VisitorException(e);
+                }
+            }
+            store.setSortInfo(sortInfo);
+        }
+    }
 
     /*
      * This class holds the internal states of a grunt shell session.

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Feb 26 19:33:27 2010
@@ -431,16 +431,12 @@
             for (POStore st: mapStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
-                if (st.getSchema()!=null)
-                    sFunc.checkSchema(new ResourceSchema(st.getSchema(), 
st.getSortInfo()));
                 sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
             }
 
             for (POStore st: reduceStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
-                if (st.getSchema()!=null)
-                    sFunc.checkSchema(new ResourceSchema(st.getSchema(), 
st.getSortInfo()));
                 sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
             }
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Fri Feb 26 19:33:27 2010
@@ -1605,6 +1605,7 @@
         store.setSFile(loStore.getOutputFile());
         store.setInputSpec(loStore.getInputSpec());
         store.setSignature(loStore.getSignature());
+        store.setSortInfo(loStore.getSortInfo());
         try {
             // create a new schema for ourselves so that when
             // we serialize we are not serializing objects that
@@ -1621,47 +1622,8 @@
         }
         currentPlan.add(store);
         
-        List<LogicalOperator> op = loStore.getPlan().getPredecessors(loStore); 
-        PhysicalOperator from;
+        PhysicalOperator from = 
logToPhyMap.get(loStore.getPlan().getPredecessors(loStore).get(0));
         
-        if(op != null) {
-            from = logToPhyMap.get(op.get(0));
-            SortInfo sortInfo = null;
-            // if store's predecessor is limit,
-            // check limit's predecessor
-            if(op.get(0) instanceof LOLimit) {
-                op = loStore.getPlan().getPredecessors(op.get(0));
-            } else if (op.get(0) instanceof LOSplitOutput) {
-                LOSplitOutput splitOutput = (LOSplitOutput)op.get(0);
-                // We assume this is the LOSplitOutput we injected for this 
case:
-                // b = order a by $0; store b into '1'; store b into '2';
-                // In this case, we should mark both '1' and '2' as sorted
-                LogicalPlan conditionPlan = splitOutput.getConditionPlan();
-                if (conditionPlan.getRoots().size()==1) {
-                    LogicalOperator root = conditionPlan.getRoots().get(0);
-                    if (root instanceof LOConst) {
-                        Object value = ((LOConst)root).getValue();
-                        if (value instanceof Boolean && (Boolean)value==true) {
-                            LogicalOperator split = 
splitOutput.getPlan().getPredecessors(splitOutput).get(0);
-                            if (split instanceof LOSplit)
-                                op = loStore.getPlan().getPredecessors(split);
-                        }
-                    }
-                }
-            }
-            PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
-            // if this predecessor is a sort, get
-            // the sort info.
-            if(op.get(0) instanceof LOSort) {
-                sortInfo = ((POSort)sortPhyOp).getSortInfo();
-            }
-            store.setSortInfo(sortInfo);
-        } else {
-            int errCode = 2051;
-            String msg = "Did not find a predecessor for Store." ;
-            throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG);            
-        }        
-
         try {
             currentPlan.connect(from, store);
         } catch (PlanException e) {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Fri Feb 
26 19:33:27 2010
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.SortInfo;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -47,6 +48,16 @@
 
     transient private StoreFuncInterface mStoreFunc;
     private static Log log = LogFactory.getLog(LOStore.class);
+    
+    private SortInfo sortInfo;
+
+    public SortInfo getSortInfo() {
+        return sortInfo;
+    }
+
+    public void setSortInfo(SortInfo sortInfo) {
+        this.sortInfo = sortInfo;
+    }
 
     /**
      * @param plan

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
 Fri Feb 26 19:33:27 2010
@@ -21,8 +21,10 @@
 
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.impl.PigContext ;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
@@ -65,10 +67,23 @@
 
         StoreFuncInterface sf = store.getStoreFunc();
         String outLoc = store.getOutputFile().getFileName();
-        Job dummyJob;
         String errMsg = "Unexpected error. Could not validate the output " +
-                       "specification for: "+outLoc;
+        "specification for: "+outLoc;
         int errCode = 2116;
+
+        try {
+            if(store.getSchema() != null){
+                sf.checkSchema(new ResourceSchema(store.getSchema(), 
store.getSortInfo()));                
+            }
+        } catch (FrontendException e) {
+            msgCollector.collect(errMsg, MessageType.Error) ;
+            throw new PlanValidationException(errMsg, errCode, 
pigCtx.getErrorSource(), e);
+        } catch (IOException e) {
+            msgCollector.collect(errMsg, MessageType.Error) ;
+            throw new PlanValidationException(errMsg, errCode, 
pigCtx.getErrorSource(), e);
+        }
+
+        Job dummyJob;
         
         try {
             dummyJob = new 
Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
 Fri Feb 26 19:33:27 2010
@@ -62,9 +62,7 @@
         if (!pigContext.inExplain) {
             // When running explain we don't want to check for input
             // files.
-            // Temporarily disabling InputOutputFileValidator on trunk
-            // till PIG-1251 and PIG-1216 are addressed.
-            //validatorList.add(new InputOutputFileValidator(pigContext)) ;
+            validatorList.add(new InputOutputFileValidator(pigContext)) ;
         }
         // This one has to be done before the type checker.
         //validatorList.add(new TypeCastInserterValidator()) ;

Modified: 
hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestInputOutputFileValidator.java 
Fri Feb 26 19:33:27 2010
@@ -65,8 +65,6 @@
     }
     
        
-// Comment out until PIG-1251 solved
-/*
     @Test
     public void testLocalModeNegative2() throws Throwable {
         
@@ -93,7 +91,6 @@
         }        
 
     }
-*/    
         
     @Test
     public void testMapReduceModeInputPositive() throws Throwable {
@@ -114,8 +111,6 @@
 
     }
     
-// Comment out until PIG-1251 solved
-/*       
     @Test
     public void testMapReduceModeInputNegative2() throws Throwable {
         
@@ -142,7 +137,6 @@
         }       
 
     }
-*/    
         
     private LogicalPlan genNewLoadStorePlan(String inputFile,
                                             String outputFile, DataStorage 
dfs) 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=916793&r1=916792&r2=916793&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java Fri Feb 
26 19:33:27 2010
@@ -613,6 +613,8 @@
         lpt.buildPlan("a = load 'bla' as (i:int, n:chararray, d:double);");
         lpt.buildPlan("b = order a by i, d;");
         LogicalPlan lp = lpt.buildPlan("store b into 'foo';");
+        PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp); 
+        siSetter.visit();
         PhysicalPlan pp = buildPhysicalPlan(lp);
         SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
         SortInfo expected = getSortInfo(
@@ -635,6 +637,8 @@
         lpt.buildPlan("b = filter a by i > 10;");
         lpt.buildPlan("c = order b by i desc, d;");
         LogicalPlan lp = lpt.buildPlan("store c into 'foo';");
+        PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp); 
+        siSetter.visit();
         PhysicalPlan pp = buildPhysicalPlan(lp);
         SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
         SortInfo expected = getSortInfo(
@@ -657,6 +661,8 @@
         lpt.buildPlan("a = load 'bla' as (i:int, n:chararray, d:double);");
         lpt.buildPlan("b = filter a by i > 10;");
         LogicalPlan lp = lpt.buildPlan("store b into 'foo';");
+        PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp); 
+        siSetter.visit();
         PhysicalPlan pp = buildPhysicalPlan(lp);
         SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
         assertEquals(null, si);
@@ -675,6 +681,8 @@
         lpt.buildPlan("c = filter b by i > 10;");
         LogicalPlan lp = lpt.buildPlan("store c into 'foo';");
         PhysicalPlan pp = buildPhysicalPlan(lp);
+        PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp); 
+        siSetter.visit();
         SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
         assertEquals(null, si);
     }
@@ -691,6 +699,8 @@
         lpt.buildPlan("b = order a by i, d desc;");
         lpt.buildPlan("c = limit b 10;");
         LogicalPlan lp = lpt.buildPlan("store c into 'foo';");
+        PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp); 
+        siSetter.visit();
         LOPrinter lpr = new LOPrinter(System.err, lp);
         lpr.visit();
         PhysicalPlan pp = buildPhysicalPlan(lp);
@@ -750,6 +760,8 @@
         lpt.buildPlan("a = load 'bla' ;");
         lpt.buildPlan("b = order a by $0;");
         LogicalPlan lp = lpt.buildPlan("store b into 'foo';");
+        PigServer.SortInfoSetter siSetter = new PigServer.SortInfoSetter(lp); 
+        siSetter.visit();
         PhysicalPlan pp = buildPhysicalPlan(lp);
         SortInfo si = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
         SortInfo expected = getSortInfo(Arrays.asList(new String[] {null}),


Reply via email to