Author: gates Date: Thu Jul 10 11:09:36 2008 New Revision: 675663 URL: http://svn.apache.org/viewvc?rev=675663&view=rev Log: PIG-293 Previous patches solved infinite loop. This patch makes order by in map reduce mode actually work.
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=675663&r1=675662&r2=675663&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu Jul 10 11:09:36 2008 @@ -180,6 +180,7 @@ public MROperPlan compile() throws IOException, PlanException, VisitorException { List<PhysicalOperator> leaves = plan.getLeaves(); POStore store = (POStore)leaves.get(0); +System.out.println("store file is " + store.getSFile()); FileLocalizer.registerDeleteOnFail(store.getSFile().getFileName(), pigContext); compile(store); @@ -729,20 +730,27 @@ } } - private int[] getSortCols(POSort sort){ + private int[] getSortCols(POSort sort) throws PlanException { List<PhysicalPlan> plans = sort.getSortPlans(); if(plans!=null){ int[] ret = new int[plans.size()]; int i=-1; for (PhysicalPlan plan : plans) { + if (((POProject)plan.getLeaves().get(0)).isStar()) return null; ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn(); } return ret; } - return null; + log.error("No expression plan found in POSort"); + throw new PlanException("No Expression Plan found in POSort"); } - public MapReduceOper getSortJob(MapReduceOper quantJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException{ + public MapReduceOper getSortJob( + MapReduceOper quantJob, + FileSpec lFile, + FileSpec quantFile, + int rp, + int[] fields) throws PlanException{ MapReduceOper mro = startNew(lFile, quantJob); mro.setQuantFile(quantFile.getFileName()); mro.setGlobalSort(true); @@ -750,23 +758,31 @@ List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>(); - if(fields==null) { - log.error("No Expression Plan found in POSort"); - throw new PlanException("No Expression Plan found in POSort"); - } - for (int i : fields) { + if (fields == null) { + // This is project * PhysicalPlan ep = new PhysicalPlan(); POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); - prj.setColumn(i); + prj.setStar(true); prj.setOverloaded(false); - prj.setResultType(DataType.BYTEARRAY); + prj.setResultType(DataType.TUPLE); ep.add(prj); eps1.add(ep); + } else { + for (int i : fields) { + PhysicalPlan ep = new PhysicalPlan(); + POProject prj = new POProject(new OperatorKey(scope, + nig.getNextNodeId(scope))); + prj.setColumn(i); + prj.setOverloaded(false); + prj.setResultType(DataType.BYTEARRAY); + ep.add(prj); + eps1.add(ep); + } } POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope))); lr.setIndex(0); - lr.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY); + lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY); lr.setPlans(eps1); lr.setResultType(DataType.TUPLE); mro.mapPlan.addAsLeaf(lr); @@ -774,7 +790,7 @@ mro.setMapDone(true); POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope))); - pkg.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY); + pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY); pkg.setNumInps(1); boolean[] inner = {false}; pkg.setInner(inner); @@ -810,19 +826,27 @@ List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>(); List<Boolean> flat1 = new ArrayList<Boolean>(); - if(fields==null) { - log.error("No Expression Plan found in POSort"); - throw new PlanException("No Expression Plan found in POSort"); - } - for (int i : fields) { + if (fields == null) { PhysicalPlan ep = new PhysicalPlan(); - POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); - prj.setColumn(i); + POProject prj = new POProject(new OperatorKey(scope, + nig.getNextNodeId(scope))); + prj.setStar(true); prj.setOverloaded(false); - prj.setResultType(DataType.BYTEARRAY); + prj.setResultType(DataType.TUPLE); ep.add(prj); eps1.add(ep); flat1.add(true); + } else { + for (int i : fields) { + PhysicalPlan ep = new PhysicalPlan(); + POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); + prj.setColumn(i); + prj.setOverloaded(false); + prj.setResultType(DataType.BYTEARRAY); + ep.add(prj); + eps1.add(ep); + flat1.add(true); + } } POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1); mro.mapPlan.addAsLeaf(nfe1); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java?rev=675663&r1=675662&r2=675663&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java Thu Jul 10 11:09:36 2008 @@ -135,7 +135,9 @@ if(res.returnStatus != POStatus.STATUS_OK){ return res; } - if(columns.size() == 1) { + if (star) { + return res; + } else if(columns.size() == 1) { ret = inpValue.get(columns.get(0)); } else { ArrayList<Object> objList = new ArrayList<Object>(columns.size()); Modified: incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=675663&r1=675662&r2=675663&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Thu Jul 10 11:09:36 2008 @@ -43,10 +43,10 @@ | | | | | Project[bag][1] - -158 | | - | |---Package[tuple]{bytearray} - -157 - | Local Rearrange[tuple]{bytearray} - -156 + | |---Package[tuple]{tuple} - -157 + | Local Rearrange[tuple]{tuple} - -156 | | | - | | Project[bytearray][0] - -155 + | | Project[tuple][*] - -155 | | | |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -153 | @@ -76,7 +76,7 @@ | | | |---New For Each(true)[tuple] - -141 | | | - | | Project[bytearray][0] - -140 + | | Project[tuple][*] - -140 | | | |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.impl.builtin.RandomSampleLoader) - -138 | Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=675663&r1=675662&r2=675663&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Thu Jul 10 11:09:36 2008 @@ -744,6 +744,7 @@ public static POStore topStoreOp() { POStore ret = new POStore(new OperatorKey("", r.nextLong())); ret.setPc(pc); + ret.setSFile(new FileSpec("DummyFil", "DummyLdr")); return ret; }