Author: pradeepkth Date: Tue Jan 26 21:25:31 2010 New Revision: 903423 URL: http://svn.apache.org/viewvc?rev=903423&view=rev Log: PIG-1090: additional patch (daijy via pradeepkth)
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=903423&r1=903422&r2=903423&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java Tue Jan 26 21:25:31 2010 @@ -149,6 +149,27 @@ } } + public ResourceSchema(Schema pigSchema, SortInfo sortInfo) { + this(pigSchema); + if (sortInfo!=null && sortInfo.getSortColInfoList().size()!=0) { + sortKeys = new int[sortInfo.getSortColInfoList().size()]; + sortKeyOrders = new Order[sortInfo.getSortColInfoList().size()]; + for (int i=0;i<sortInfo.getSortColInfoList().size();i++) { + SortColInfo colInfo = sortInfo.getSortColInfoList().get(i); + int index = colInfo.getColIndex(); + Order order; + org.apache.pig.SortColInfo.Order origOrder = colInfo.getSortOrder(); + if (origOrder==org.apache.pig.SortColInfo.Order.ASCENDING) { + order = Order.ASCENDING; + } else { + order = Order.DESCENDING; + } + sortKeys[i] = index; + sortKeyOrders[i] = order; + } + } + } + public int getVersion() { return version; } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=903423&r1=903422&r2=903423&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Jan 26 21:25:31 2010 @@ -444,17 +444,17 @@ for (POStore st: mapStores) { storeLocations.add(st); StoreFunc sFunc = st.getStoreFunc(); - //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob); + sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob); if (st.getSchema()!=null) - sFunc.checkSchema(new ResourceSchema(st.getSchema())); + sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo())); } for (POStore st: reduceStores) { storeLocations.add(st); StoreFunc sFunc = st.getStoreFunc(); - //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob); + sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob); if (st.getSchema()!=null) - sFunc.checkSchema(new ResourceSchema(st.getSchema())); + sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo())); } // the OutputFormat we report to Hadoop is always PigOutputFormat Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=903423&r1=903422&r2=903423&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Jan 26 21:25:31 2010 @@ -159,7 +159,7 @@ Schema schema = store.getSchema(); if (schema != null) { ((StoreMetadata) storeFunc).storeSchema( - new ResourceSchema(schema), store.getSFile() + new ResourceSchema(schema, store.getSortInfo()), store.getSFile() .getFileName(), conf); } } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=903423&r1=903422&r2=903423&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Tue Jan 26 21:25:31 2010 @@ -1630,6 +1630,23 @@ // check limit's predecessor if(op.get(0) instanceof LOLimit) { op = loStore.getPlan().getPredecessors(op.get(0)); + } else if (op.get(0) instanceof LOSplitOutput) { + LOSplitOutput splitOutput = (LOSplitOutput)op.get(0); + // We assume this is the LOSplitOutput we injected for this case: + // b = order a by $0; store b into '1'; store b into '2'; + // In this case, we should mark both '1' and '2' as sorted + LogicalPlan conditionPlan = splitOutput.getConditionPlan(); + if (conditionPlan.getRoots().size()==1) { + LogicalOperator root = conditionPlan.getRoots().get(0); + if (root instanceof LOConst) { + Object value = ((LOConst)root).getValue(); + if (value instanceof Boolean && (Boolean)value==true) { + LogicalOperator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0); + if (split instanceof LOSplit) + op = loStore.getPlan().getPredecessors(split); + } + } + } } PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0)); // if this predecessor is a sort, get Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=903423&r1=903422&r2=903423&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java Tue Jan 26 21:25:31 2010 @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.ExecType; +import org.apache.pig.PigServer; import org.apache.pig.SortColInfo; import org.apache.pig.SortInfo; import org.apache.pig.backend.executionengine.ExecException; @@ -704,6 +705,41 @@ } /** + * tests that sortInfo is not null when there are multiple store + * @throws Exception + */ + @Test + public void testSortInfoMultipleStore() throws Exception { + PigServer myPig = new PigServer(ExecType.MAPREDUCE); + myPig.setBatchOn(); + myPig.registerQuery("a = load 'bla' as (i:int, n:chararray, d:double);"); + myPig.registerQuery("b = order a by i, d desc;"); + myPig.registerQuery("store b into '1';"); + myPig.registerQuery("store b into '2';"); + java.lang.reflect.Method compileLp = myPig.getClass() + .getDeclaredMethod("compileLp", + new Class[] { String.class }); + + compileLp.setAccessible(true); + + LogicalPlan lp = (LogicalPlan) compileLp.invoke(myPig, new Object[] { null }); + LOPrinter lpr = new LOPrinter(System.err, lp); + lpr.visit(); + + PhysicalPlan pp = buildPhysicalPlan(lp); + SortInfo si0 = ((POStore)(pp.getLeaves().get(0))).getSortInfo(); + SortInfo si1 = ((POStore)(pp.getLeaves().get(1))).getSortInfo(); + SortInfo expected = getSortInfo( + Arrays.asList(new String[] {"i", "d"}), + Arrays.asList(new Integer[] {0, 2}), + Arrays.asList(new SortColInfo.Order[] { + SortColInfo.Order.ASCENDING, + SortColInfo.Order.DESCENDING})); + assertEquals(expected, si0); + assertEquals(expected, si1); + } + + /** * tests that sortInfo is null when there is no schema for order by * before the store * @throws Exception Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java?rev=903423&r1=903422&r2=903423&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java Tue Jan 26 21:25:31 2010 @@ -21,7 +21,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.List; + import org.apache.pig.ResourceSchema; +import org.apache.pig.SortColInfo; +import org.apache.pig.SortInfo; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataType; @@ -82,6 +87,40 @@ } /** + * Test that ResourceSchema is correctly with SortInfo + */ + @Test + public void testResourceFlatSchemaCreationWithSortInfo() + throws ExecException, SchemaMergeException, FrontendException { + String [] aliases ={"f1", "f2"}; + byte[] types = {DataType.CHARARRAY, DataType.INTEGER}; + + Schema origSchema = new Schema( + new Schema.FieldSchema("t1", + new Schema( + new Schema.FieldSchema("t0", + TypeCheckingTestUtil.genFlatSchema( + aliases,types), + DataType.TUPLE)), DataType.BAG)); + List<SortColInfo> colList = new ArrayList<SortColInfo>(); + SortColInfo col1 = new SortColInfo("f1", 0, SortColInfo.Order.ASCENDING); + SortColInfo col2 = new SortColInfo("f1", 1, SortColInfo.Order.DESCENDING); + colList.add(col1); + colList.add(col2); + SortInfo sortInfo = new SortInfo(colList); + + ResourceSchema rsSchema = new ResourceSchema(origSchema, sortInfo); + + Schema genSchema = Schema.getPigSchema(rsSchema); + assertTrue("generated schema equals original", + Schema.equals(genSchema, origSchema, true, false)); + assertTrue(rsSchema.getSortKeys()[0]==0); + assertTrue(rsSchema.getSortKeys()[1]==1); + assertTrue(rsSchema.getSortKeyOrders()[0]==ResourceSchema.Order.ASCENDING); + assertTrue(rsSchema.getSortKeyOrders()[1]==ResourceSchema.Order.DESCENDING); + } + + /** * Test that Pig Schema is correctly created given a * ResourceSchema and vice versa. Test also that * TwoLevelAccess flag is set for Pig Schema when needed.