Author: olga Date: Fri Oct 24 15:55:24 2008 New Revision: 707776 URL: http://svn.apache.org/viewvc?rev=707776&view=rev Log: PIG-508: problems with double joins
Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=707776&r1=707775&r2=707776&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Fri Oct 24 15:55:24 2008 @@ -301,3 +301,5 @@ PIG-499: parser issue with as (sms via olgan) PIG-507: permission error not reported (pradeepk via olgan) + + PIG-508: problem with double joins (pradeepk via olgan) Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=707776&r1=707775&r2=707776&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Oct 24 15:55:24 2008 @@ -809,7 +809,8 @@ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope))); pkg.setKeyType(DataType.TUPLE); - pkg.setNumInps(0); + pkg.setDistinct(true); + pkg.setNumInps(1); boolean[] inner = {false}; pkg.setInner(inner); curMROp.reducePlan.add(pkg); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=707776&r1=707775&r2=707776&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri Oct 24 15:55:24 2008 @@ -58,7 +58,6 @@ // OR in the reduce plan. POPostCombinerPackage could // be present only in the reduce plan. Search in these two // plans accordingly - if(!mr.combinePlan.isEmpty()) { PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan); pkgDiscoverer.visit(); @@ -76,7 +75,7 @@ // if the POPackage is actually a POPostCombinerPackage, then we should // just look for the corresponding LocalRearrange(s) in the combine plan if(pkg instanceof POPostCombinerPackage) { - if(!patchPackage(mr.combinePlan, pkg)) { + if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) { throw new VisitorException("Unexpected problem while trying " + "to optimize (could not find LORearrange in combine plan)"); } @@ -91,26 +90,31 @@ private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException { // the LocalRearrange(s) could either be in the map of this MapReduceOper // OR in the reduce of predecessor MapReduceOpers - if(!patchPackage(mr.mapPlan, pkg)) { + int lrFound = 0; + + lrFound = patchPackage(mr.mapPlan, pkg); + if(lrFound != pkg.getNumInps()) { // we did not find the LocalRearrange(s) in the map plan // let's look in the predecessors List<MapReduceOper> preds = this.mPlan.getPredecessors(mr); for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) { MapReduceOper mrOper = it.next(); - if(!patchPackage(mrOper.reducePlan, pkg)) { - throw new VisitorException("Unexpected problem while trying " + - "to optimize (could not find LORearrange in predecessor's reduce plan)"); + lrFound += patchPackage(mrOper.reducePlan, pkg); + if(lrFound == pkg.getNumInps()) { + break; } } } + if(lrFound != pkg.getNumInps()) + throw new VisitorException("Unexpected problem while trying to optimize (Could not find all LocalRearranges)"); } - private boolean patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException { + private int patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException { LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(plan, pkg); lrDiscoverer.visit(); // let our caller know if we managed to patch // the package - return lrDiscoverer.isLoRearrangeFound(); + return lrDiscoverer.getLoRearrangeFound(); } /** @@ -161,7 +165,7 @@ */ class LoRearrangeDiscoverer extends PhyPlanVisitor { - private boolean loRearrangeFound = false; + private int loRearrangeFound = 0; private POPackage pkg; public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) { @@ -174,14 +178,22 @@ */ @Override public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException { - loRearrangeFound = true; + loRearrangeFound++; Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo; // annotate the package with information from the LORearrange // update the keyInfo information if already present in the POPackage keyInfo = pkg.getKeyInfo(); if(keyInfo == null) keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); + + if(keyInfo.get(lrearrange.getIndex()) != null) { + // something is wrong - we should not be getting key info + // for the same index from two different Local Rearranges + throw new VisitorException("Unexpected problem while trying " + + "to optimize (found same index:" + lrearrange.getIndex() + + " in multiple Local Rearrange operators"); + } keyInfo.put(new Integer(lrearrange.getIndex()), new Pair<Boolean, Map<Integer, Integer>>( lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); @@ -192,7 +204,7 @@ /** * @return the loRearrangeFound */ - public boolean isLoRearrangeFound() { + public int getLoRearrangeFound() { return loRearrangeFound; } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=707776&r1=707775&r2=707776&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Oct 24 15:55:24 2008 @@ -84,6 +84,10 @@ //on a particular input boolean[] inner; + // flag to denote whether there is a distinct + // leading to this package + protected boolean distinct = false; + // A mapping of input index to key information got from LORearrange // for that index. The Key information is a pair of boolean, Map. // The boolean indicates whether there is a lone project(*) in the @@ -182,97 +186,36 @@ */ @Override public Result getNext(Tuple t) throws ExecException { - //Create numInputs bags - DataBag[] dbs = null; - if (numInputs > 0) { + Tuple res; + if(distinct) { + // only set the key which has the whole + // tuple + res = mTupleFactory.newTuple(1); + res.set(0, key); + } else { + //Create numInputs bags + DataBag[] dbs = null; dbs = new DataBag[numInputs]; for (int i = 0; i < numInputs; i++) { dbs[i] = mBagFactory.newDefaultBag(); } - } - - //For each indexed tup in the inp, sort them - //into their corresponding bags based - //on the index - while (tupIter.hasNext()) { - NullableTuple ntup = tupIter.next(); - // Need to make a copy of the value, as hadoop uses the same ntup - // to represent each value. - Tuple val = (Tuple)ntup.getValueAsPigType(); - /* - Tuple copy = mTupleFactory.newTuple(val.size()); - for (int i = 0; i < val.size(); i++) { - copy.set(i, val.get(i)); - } - */ - Tuple copy = null; - // The "value (val)" that we just got may not - // be the complete "value". It may have some portions - // in the "key" (look in POLocalRearrange for more comments) - // If this is the case we need to stitch - // the "value" together. - int index = ntup.getIndex(); - Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = - keyInfo.get(index); - boolean isProjectStar = lrKeyInfo.first; - Map<Integer, Integer> keyLookup = lrKeyInfo.second; - int keyLookupSize = keyLookup.size(); - - if( keyLookupSize > 0) { - - // we have some fields of the "value" in the - // "key". - copy = mTupleFactory.newTuple(); - int finalValueSize = keyLookupSize + val.size(); - int valIndex = 0; // an index for accessing elements from - // the value (val) that we have currently - for(int i = 0; i < finalValueSize; i++) { - Integer keyIndex = keyLookup.get(i); - if(keyIndex == null) { - // the field for this index is not in the - // key - so just take it from the "value" - // we were handed - copy.append(val.get(valIndex)); - valIndex++; - } else { - // the field for this index is in the key - if(isKeyTuple) { - // the key is a tuple, extract the - // field out of the tuple - copy.append(keyAsTuple.get(keyIndex)); - } else { - copy.append(key); - } - } - } - - } else if (isProjectStar) { - - log.info("In project star, keyAsTuple:" + keyAsTuple); - // the whole "value" is present in the "key" - copy = mTupleFactory.newTuple(keyAsTuple.getAll()); - - } else { - - // there is no field of the "value" in the - // "key" - so just make a copy of what we got - // as the "value" - copy = mTupleFactory.newTuple(val.getAll()); - + //For each indexed tup in the inp, sort them + //into their corresponding bags based + //on the index + while (tupIter.hasNext()) { + NullableTuple ntup = tupIter.next(); + int index = ntup.getIndex(); + Tuple copy = getValueTuple(ntup, index); + dbs[index].add(copy); + if(reporter!=null) reporter.progress(); } - if (numInputs > 0) dbs[index].add(copy); - if(reporter!=null) reporter.progress(); - } - - //Construct the output tuple by appending - //the key and all the above constructed bags - //and return it. - Tuple res; - res = mTupleFactory.newTuple(numInputs+1); - res.set(0,key); - if (numInputs > 0) { + //Construct the output tuple by appending + //the key and all the above constructed bags + //and return it. + res = mTupleFactory.newTuple(numInputs+1); + res.set(0,key); int i=-1; for (DataBag bag : dbs) { if(inner[++i]){ @@ -293,6 +236,73 @@ return r; } + protected Tuple getValueTuple(NullableTuple ntup, int index) throws ExecException { + // Need to make a copy of the value, as hadoop uses the same ntup + // to represent each value. + Tuple val = (Tuple)ntup.getValueAsPigType(); + /* + Tuple copy = mTupleFactory.newTuple(val.size()); + for (int i = 0; i < val.size(); i++) { + copy.set(i, val.get(i)); + } + */ + + Tuple copy = null; + // The "value (val)" that we just got may not + // be the complete "value". It may have some portions + // in the "key" (look in POLocalRearrange for more comments) + // If this is the case we need to stitch + // the "value" together. + Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = + keyInfo.get(index); + boolean isProjectStar = lrKeyInfo.first; + Map<Integer, Integer> keyLookup = lrKeyInfo.second; + int keyLookupSize = keyLookup.size(); + + if( keyLookupSize > 0) { + + // we have some fields of the "value" in the + // "key". + copy = mTupleFactory.newTuple(); + int finalValueSize = keyLookupSize + val.size(); + int valIndex = 0; // an index for accessing elements from + // the value (val) that we have currently + for(int i = 0; i < finalValueSize; i++) { + Integer keyIndex = keyLookup.get(i); + if(keyIndex == null) { + // the field for this index is not in the + // key - so just take it from the "value" + // we were handed + copy.append(val.get(valIndex)); + valIndex++; + } else { + // the field for this index is in the key + if(isKeyTuple) { + // the key is a tuple, extract the + // field out of the tuple + copy.append(keyAsTuple.get(keyIndex)); + } else { + copy.append(key); + } + } + } + + } else if (isProjectStar) { + + // the whole "value" is present in the "key" + copy = mTupleFactory.newTuple(keyAsTuple.getAll()); + + } else { + + // there is no field of the "value" in the + // "key" - so just make a copy of what we got + // as the "value" + copy = mTupleFactory.newTuple(val.getAll()); + + } + return copy; + } + public byte getKeyType() { return keyType; } @@ -340,5 +350,19 @@ return keyInfo; } + /** + * @return the distinct + */ + public boolean isDistinct() { + return distinct; + } + + /** + * @param distinct the distinct to set + */ + public void setDistinct(boolean distinct) { + this.distinct = distinct; + } + } Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=707776&r1=707775&r2=707776&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Fri Oct 24 15:55:24 2008 @@ -49,6 +49,7 @@ import org.apache.pig.impl.io.PigFile; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.Pair; import junit.framework.TestCase; @@ -659,4 +660,37 @@ assertEquals(1, t.get(2)); assertEquals(Integer.class, t.get(2).getClass()); } + + @Test + public void testCogroupWithInputFromGroup() throws IOException, ExecException { + // Create input file with ascii data + File input = Util.createInputFile("tmp", "", + new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2", + "pigtester2\t10\t1.2", + "pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"}); + + Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>(); + // we will in essence be doing a group on first column and getting + // SUM over second column and a count for the group - store + // the results for the three groups above so we can check the output + resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L)); + resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L)); + resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L)); + + pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " + + "as (name:chararray, age:int, gpa:double);"); + pigServer.registerQuery("b = group a by name;"); + pigServer.registerQuery("c = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " + + "as (name:chararray, age:int, gpa:double);"); + pigServer.registerQuery("d = cogroup b by group, c by name;"); + pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);"); + Iterator<Tuple> it = pigServer.openIterator("e"); + for(int i = 0; i < resultMap.size(); i++) { + Tuple t = it.next(); + assertEquals(true, resultMap.containsKey(t.get(0))); + Pair<Long, Long> output = resultMap.get(t.get(0)); + assertEquals(output.first, t.get(1)); + assertEquals(output.second, t.get(2)); + } + } }