Author: pradeepkth
Date: Tue Jan 26 21:25:31 2010
New Revision: 903423

URL: http://svn.apache.org/viewvc?rev=903423&view=rev
Log:
PIG-1090: additional patch (daijy via pradeepkth)

Modified:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
    
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java 
Tue Jan 26 21:25:31 2010
@@ -149,6 +149,27 @@
         }        
     }
     
+    public ResourceSchema(Schema pigSchema, SortInfo sortInfo) {
+        this(pigSchema);
+        if (sortInfo!=null && sortInfo.getSortColInfoList().size()!=0) {
+            sortKeys = new int[sortInfo.getSortColInfoList().size()];
+            sortKeyOrders = new Order[sortInfo.getSortColInfoList().size()];
+            for (int i=0;i<sortInfo.getSortColInfoList().size();i++) {
+                SortColInfo colInfo = sortInfo.getSortColInfoList().get(i); 
+                int index = colInfo.getColIndex();
+                Order order;
+                org.apache.pig.SortColInfo.Order origOrder = 
colInfo.getSortOrder();
+                if (origOrder==org.apache.pig.SortColInfo.Order.ASCENDING) {
+                    order = Order.ASCENDING;
+                } else {
+                    order = Order.DESCENDING;
+                }
+                sortKeys[i] = index;
+                sortKeyOrders[i] = order;
+            }
+        }
+    }
+    
     public int getVersion() {
         return version;
     }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Tue Jan 26 21:25:31 2010
@@ -444,17 +444,17 @@
             for (POStore st: mapStores) {
                 storeLocations.add(st);
                 StoreFunc sFunc = st.getStoreFunc();
-                //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
                 if (st.getSchema()!=null)
-                    sFunc.checkSchema(new ResourceSchema(st.getSchema()));
+                    sFunc.checkSchema(new ResourceSchema(st.getSchema(), 
st.getSortInfo()));
             }
 
             for (POStore st: reduceStores) {
                 storeLocations.add(st);
                 StoreFunc sFunc = st.getStoreFunc();
-                //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
                 if (st.getSchema()!=null)
-                    sFunc.checkSchema(new ResourceSchema(st.getSchema()));
+                    sFunc.checkSchema(new ResourceSchema(st.getSchema(), 
st.getSortInfo()));
             }
 
             // the OutputFormat we report to Hadoop is always PigOutputFormat

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 Tue Jan 26 21:25:31 2010
@@ -159,7 +159,7 @@
             Schema schema = store.getSchema();
             if (schema != null) {
                 ((StoreMetadata) storeFunc).storeSchema(
-                        new ResourceSchema(schema), store.getSFile()
+                        new ResourceSchema(schema, store.getSortInfo()), 
store.getSFile()
                                 .getFileName(), conf);
             }
         }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
 Tue Jan 26 21:25:31 2010
@@ -1630,6 +1630,23 @@
             // check limit's predecessor
             if(op.get(0) instanceof LOLimit) {
                 op = loStore.getPlan().getPredecessors(op.get(0));
+            } else if (op.get(0) instanceof LOSplitOutput) {
+                LOSplitOutput splitOutput = (LOSplitOutput)op.get(0);
+                // We assume this is the LOSplitOutput we injected for this 
case:
+                // b = order a by $0; store b into '1'; store b into '2';
+                // In this case, we should mark both '1' and '2' as sorted
+                LogicalPlan conditionPlan = splitOutput.getConditionPlan();
+                if (conditionPlan.getRoots().size()==1) {
+                    LogicalOperator root = conditionPlan.getRoots().get(0);
+                    if (root instanceof LOConst) {
+                        Object value = ((LOConst)root).getValue();
+                        if (value instanceof Boolean && (Boolean)value==true) {
+                            LogicalOperator split = 
splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+                            if (split instanceof LOSplit)
+                                op = loStore.getPlan().getPredecessors(split);
+                        }
+                    }
+                }
             }
             PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
             // if this predecessor is a sort, get

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
 Tue Jan 26 21:25:31 2010
@@ -34,6 +34,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
 import org.apache.pig.SortColInfo;
 import org.apache.pig.SortInfo;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -704,6 +705,41 @@
     }
     
     /**
+     * tests that sortInfo is not null when there are multiple store
+     * @throws Exception
+     */
+    @Test
+    public void testSortInfoMultipleStore() throws Exception {
+        PigServer myPig = new PigServer(ExecType.MAPREDUCE);
+        myPig.setBatchOn();
+        myPig.registerQuery("a = load 'bla' as (i:int, n:chararray, 
d:double);");
+        myPig.registerQuery("b = order a by i, d desc;");
+        myPig.registerQuery("store b into '1';");
+        myPig.registerQuery("store b into '2';");
+        java.lang.reflect.Method compileLp = myPig.getClass()
+            .getDeclaredMethod("compileLp",
+            new Class[] { String.class });
+
+        compileLp.setAccessible(true);
+
+        LogicalPlan lp = (LogicalPlan) compileLp.invoke(myPig, new Object[] { 
null });
+        LOPrinter lpr = new LOPrinter(System.err, lp);
+        lpr.visit();
+        
+        PhysicalPlan pp = buildPhysicalPlan(lp);
+        SortInfo si0 = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
+        SortInfo si1 = ((POStore)(pp.getLeaves().get(1))).getSortInfo();
+        SortInfo expected = getSortInfo(
+                Arrays.asList(new String[] {"i", "d"}), 
+                Arrays.asList(new Integer[] {0, 2}),
+                Arrays.asList(new SortColInfo.Order[] {
+                        SortColInfo.Order.ASCENDING, 
+                        SortColInfo.Order.DESCENDING}));
+        assertEquals(expected, si0);
+        assertEquals(expected, si1);
+    }
+    
+    /**
      * tests that sortInfo is null when there is no schema for order by
      * before the store
      * @throws Exception

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
 Tue Jan 26 21:25:31 2010
@@ -21,7 +21,12 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.SortColInfo;
+import org.apache.pig.SortInfo;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
@@ -82,6 +87,40 @@
     }
     
     /**
+     * Test that ResourceSchema is correctly with SortInfo
+     */
+    @Test
+    public void testResourceFlatSchemaCreationWithSortInfo() 
+    throws ExecException, SchemaMergeException, FrontendException {
+        String [] aliases ={"f1", "f2"};
+        byte[] types = {DataType.CHARARRAY, DataType.INTEGER};
+        
+        Schema origSchema = new Schema(
+                new Schema.FieldSchema("t1", 
+                        new Schema(
+                                new Schema.FieldSchema("t0", 
+                                        TypeCheckingTestUtil.genFlatSchema(
+                                                aliases,types), 
+                                                DataType.TUPLE)), 
DataType.BAG));
+        List<SortColInfo> colList = new ArrayList<SortColInfo>();
+        SortColInfo col1 = new SortColInfo("f1", 0, 
SortColInfo.Order.ASCENDING);
+        SortColInfo col2 = new SortColInfo("f1", 1, 
SortColInfo.Order.DESCENDING);
+        colList.add(col1);
+        colList.add(col2);
+        SortInfo sortInfo = new SortInfo(colList);
+                        
+        ResourceSchema rsSchema = new ResourceSchema(origSchema, sortInfo);
+
+        Schema genSchema = Schema.getPigSchema(rsSchema);
+        assertTrue("generated schema equals original", 
+                Schema.equals(genSchema, origSchema, true, false));
+        assertTrue(rsSchema.getSortKeys()[0]==0);
+        assertTrue(rsSchema.getSortKeys()[1]==1);
+        
assertTrue(rsSchema.getSortKeyOrders()[0]==ResourceSchema.Order.ASCENDING);
+        
assertTrue(rsSchema.getSortKeyOrders()[1]==ResourceSchema.Order.DESCENDING);
+    }
+    
+    /**
      * Test that Pig Schema is correctly created given a
      * ResourceSchema and vice versa. Test also that 
      * TwoLevelAccess flag is set for Pig Schema when needed.


Reply via email to