Author: pradeepkth Date: Tue Oct 20 20:42:37 2009 New Revision: 827786 URL: http://svn.apache.org/viewvc?rev=827786&view=rev Log: PIG-976: Multi-query optimization throws ClassCastException (rding via pradeepkth)
Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Oct 20 20:42:37 2009 @@ -63,6 +63,9 @@ BUG FIXES +PIG-976: Multi-query optimization throws ClassCastException (rding via +pradeepkth) + PIG-858: Order By followed by "replicated" join fails while compiling MR-plan from physical plan (ashutoshc via gates) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Oct 20 20:42:37 2009 @@ -106,6 +106,11 @@ DISTINCT }; private int mKeyField = -1; + + // This array tracks the positions of the group key in the output tuples + // of the foreach clause. This needs to be revisited when combiner optimizer + // supports foreach output with parts of group key (e.g. group.$0). + private boolean[] keyFieldPositions; private byte mKeyType = 0; @@ -247,7 +252,7 @@ // as it needs to act differently than the regular // package operator. POCombinerPackage combinePack = - new POCombinerPackage(pack, bags); + new POCombinerPackage(pack, bags, keyFieldPositions); mr.combinePlan.add(combinePack); mr.combinePlan.add(cfe); mr.combinePlan.connect(combinePack, cfe); @@ -282,7 +287,7 @@ // be the POCombiner package, as it needs to act // differently than the regular package operator. POCombinerPackage newReducePack = - new POCombinerPackage(pack, bags); + new POCombinerPackage(pack, bags, keyFieldPositions); mr.reducePlan.replace(pack, newReducePack); // the replace() above only changes @@ -360,6 +365,7 @@ List<ExprType> types = new ArrayList<ExprType>(plans.size()); boolean atLeastOneAlgebraic = false; boolean noNonAlgebraics = true; + keyFieldPositions = new boolean[plans.size()]; for (int i = 0; i < plans.size(); i++) { ExprType t = algebraic(plans.get(i), flattens.get(i), i); types.add(t); @@ -412,6 +418,7 @@ if (cols != null && cols.size() == 1 && cols.get(0) == 0 && pp.getPredecessors(proj) == null) { mKeyField = field; + keyFieldPositions[field] = true; mKeyType = proj.getResultType(); } else { // It can't be a flatten except on the grouping column @@ -525,6 +532,8 @@ addKeyProject(mfe); addKeyProject(cfe); mKeyField = cPlans.size() - 1; + keyFieldPositions = new boolean[cPlans.size()]; + keyFieldPositions[mKeyField] = true; } // Change the plans on the reduce/combine foreach to project from the column @@ -925,5 +934,6 @@ private void resetState() { mKeyField = -1; mKeyType = 0; + keyFieldPositions = null; } } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Tue Oct 20 20:42:37 2009 @@ -600,9 +600,11 @@ pkg.addPackage(pk); } + boolean[] keyPos = pk.getKeyPositionsInTuple(); + PODemux demux = (PODemux)to.getLeaves().get(0); for (int i=initial; i<current; i++) { - demux.addPlan(from, mapKeyType); + demux.addPlan(from, mapKeyType, keyPos); } if (demux.isSameMapKeyType()) { @@ -752,11 +754,13 @@ pkg.setKeyType(cpk.getKeyType()); + boolean[] keyPos = cpk.getKeyPositionsInTuple(); + // See comment above for why we replicated the Package // in the from plan - for the same reason, we replicate // the Demux operators now. for (int i=initial; i<current; i++) { - demux.addPlan(from, mapKeyType); + demux.addPlan(from, mapKeyType, keyPos); } } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java Tue Oct 20 20:42:37 2009 @@ -17,6 +17,7 @@ */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -58,9 +59,12 @@ private static TupleFactory mTupleFactory = TupleFactory.getInstance(); private boolean[] mBags; // For each field, indicates whether or not it - - private Map<Integer, Integer> keyLookup; // needs to be put in a bag. + + private boolean[] keyPositions; + + private Map<Integer, Integer> keyLookup; + /** * A new POPostCombinePackage will be constructed as a near clone of the @@ -68,8 +72,10 @@ * @param pkg POPackage to clone. * @param bags for each field, indicates whether it should be a bag (true) * or a simple field (false). + * @param keyPos for each field in the output tuple of the foreach operator, + * indicates whether it's the group key. */ - public POCombinerPackage(POPackage pkg, boolean[] bags) { + public POCombinerPackage(POPackage pkg, boolean[] bags, boolean[] keyPos) { super(new OperatorKey(pkg.getOperatorKey().scope, NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)), pkg.getRequestedParallelism(), pkg.getInputs()); @@ -80,7 +86,12 @@ for (int i = 0; i < pkg.inner.length; i++) { inner[i] = true; } - mBags = bags; + if (bags != null) { + mBags = Arrays.copyOf(bags, bags.length); + } + if (keyPos != null) { + keyPositions = Arrays.copyOf(keyPos, keyPos.length); + } } @Override @@ -129,7 +140,7 @@ // the value (tup) that we have currently for(int i = 0; i < mBags.length; i++) { Integer keyIndex = keyLookup.get(i); - if(keyIndex == null) { + if(keyIndex == null && mBags[i]) { // the field for this index is not the // key - so just take it from the "value" // we were handed - Currently THIS HAS TO BE A BAG @@ -159,5 +170,10 @@ return r; } + + @Override + public boolean[] getKeyPositionsInTuple() { + return keyPositions.clone(); + } } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Tue Oct 20 20:42:37 2009 @@ -23,7 +23,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -86,6 +85,15 @@ * to "unwrap" the tuple to get to the key */ private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>(); + + /** + * The list tracks the field position of the key in the input tuple so that + * the right values are "unwrapped" to get the key. + * The tuples emitted from POCombinerPackages always have keys in a fixed + * position, but this position varies depending on the Pig Latin scripts. + */ + private ArrayList<boolean[]> keyPositions = new ArrayList<boolean[]>(); + /* * Flag indicating when a new pull should start */ @@ -215,13 +223,14 @@ * * @param inPlan plan to be appended to the inner plan list */ - public void addPlan(PhysicalPlan inPlan, byte mapKeyType) { + public void addPlan(PhysicalPlan inPlan, byte mapKeyType, boolean[] keyPos) { myPlans.add(inPlan); processedSet.set(myPlans.size()-1); // if mapKeyType is already a tuple, we will NOT // be wrapping it in an extra tuple. If it is not // a tuple, we will wrap into in a tuple isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true); + keyPositions.add(keyPos); } @Override @@ -339,24 +348,37 @@ private PhysicalOperator attachInputWithIndex(Tuple res) throws ExecException { - // unwrap the key to get the wrapped value which - // is expected by the inner plans - PigNullableWritable key = (PigNullableWritable)res.get(0); + // unwrap the first field of the tuple to get the wrapped value which + // is expected by the inner plans, as well as the index of the associated + // inner plan. + PigNullableWritable fld = (PigNullableWritable)res.get(0); // choose an inner plan to run based on the index set by // the POLocalRearrange operator and passed to this operator // by POMultiQueryPackage - int index = key.getIndex(); + int index = fld.getIndex(); index &= idxPart; index -= baseIndex; PhysicalPlan pl = myPlans.get(index); if (!(pl.getRoots().get(0) instanceof PODemux)) { if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) { - Tuple tup = (Tuple)key.getValueAsPigType(); - res.set(0, tup.get(0)); + + // unwrap the keys + boolean[] keys = keyPositions.get(index); + for (int pos = 0; pos < keys.length; pos++) { + if (keys[pos]) { + Tuple tup = (pos == 0) ? + (Tuple)fld.getValueAsPigType() : (Tuple)res.get(pos); + res.set(pos, tup.get(0)); + } + else if (pos == 0) { + res.set(0, fld.getValueAsPigType()); + } + } + } else { - res.set(0, key.getValueAsPigType()); + res.set(0, fld.getValueAsPigType()); } } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Tue Oct 20 20:42:37 2009 @@ -30,9 +30,11 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.NullableUnknownWritable; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.backend.hadoop.HDataType; /** * The package operator that packages the globally rearranged tuples @@ -168,7 +170,9 @@ @Override public Result getNext(Tuple t) throws ExecException { - int index = myKey.getIndex(); + byte origIndex = myKey.getIndex(); + + int index = (int)origIndex; index &= idxPart; index -= baseIndex; @@ -187,18 +191,33 @@ Tuple tuple = (Tuple)res.result; - // the key present in the first field - // of the tuple above is the real key without + // the object present in the first field + // of the tuple above is the real data without // index information - this is because the - // package above, extracts the real key out of - // the PigNullableWritable key - we are going to + // package above, extracts the real data out of + // the PigNullableWritable object - we are going to // give this result tuple to a PODemux operator - // which needs a PigNullableWritable key so - // it can figure out the index - we already have - // the PigNullableWritable key cachec in "myKey" - // let's send this in the result tuple - tuple.set(0, myKey); - + // which needs a PigNullableWritable first field so + // it can figure out the index. Therefore we need + // to add index to the first field of the tuple. + + Object obj = tuple.get(0); + if (obj instanceof PigNullableWritable) { + ((PigNullableWritable)obj).setIndex(origIndex); + } + else { + PigNullableWritable myObj = null; + if (obj == null) { + myObj = new NullableUnknownWritable(); + myObj.setNull(true); + } + else { + myObj = HDataType.getWritableComparableTypes(obj, (byte)0); + } + myObj.setIndex(origIndex); + tuple.set(0, myObj); + } + return res; } Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Tue Oct 20 20:42:37 2009 @@ -60,7 +60,14 @@ * */ private static final long serialVersionUID = 1L; + + private static boolean[] SIMPLE_KEY_POSITION; + static { + SIMPLE_KEY_POSITION = new boolean[1]; + SIMPLE_KEY_POSITION[0] = true; + } + //The iterator of indexed Tuples //that is typically provided by //Hadoop @@ -328,6 +335,17 @@ } /** + * Get the field positions of key in the output tuples. + * For POPackage, the position is always 0. The POCombinerPackage, + * however, can return different values. + * + * @return the field position of key in the output tuples. + */ + public boolean[] getKeyPositionsInTuple() { + return SIMPLE_KEY_POSITION.clone(); + } + + /** * Make a deep copy of this operator. * @throws CloneNotSupportedException */ Added: hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java?rev=827786&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/impl/io/NullableUnknownWritable.java Tue Oct 20 20:42:37 2009 @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.io; + +import org.apache.hadoop.io.WritableComparable; + +/** + * This class can be used when data type is 'Unknown' and + * there is a need for PigNullableWritable object. + */ +public class NullableUnknownWritable extends PigNullableWritable { + + public NullableUnknownWritable() { + } + + public NullableUnknownWritable(WritableComparable<?> u) { + mValue = u; + } + + @Override + public Object getValueAsPigType() { + return isNull() ? null : mValue; + } +} Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=827786&r1=827785&r2=827786&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Oct 20 20:42:37 2009 @@ -18,7 +18,10 @@ package org.apache.pig.test; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.io.StringReader; import java.util.ArrayList; import java.util.Collections; @@ -42,6 +45,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -56,6 +60,7 @@ import org.junit.Before; import org.junit.Test; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.jobcontrol.Job; public class TestMultiQuery extends TestCase { @@ -107,8 +112,204 @@ Assert.fail(); } } + + @Test + public void testMultiQueryJiraPig976() { + + // test case: key ('group') isn't part of foreach output + // and keys have the same type. + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = group a by uid;"); + myPig.registerQuery("c = group a by gid;"); + myPig.registerQuery("d = foreach b generate SUM(a.gid);"); + myPig.registerQuery("e = foreach c generate group, COUNT(a);"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("store e into '/tmp/output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryJiraPig976_2() { + + // test case: key ('group') isn't part of foreach output + // and keys have different types + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = group a by uname;"); + myPig.registerQuery("c = group a by gid;"); + myPig.registerQuery("d = foreach b generate SUM(a.gid);"); + myPig.registerQuery("e = foreach c generate group, COUNT(a);"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("store e into '/tmp/output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryJiraPig976_3() { + + // test case: group all and key ('group') isn't part of output + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = group a all;"); + myPig.registerQuery("c = group a by gid;"); + myPig.registerQuery("d = foreach b generate SUM(a.gid);"); + myPig.registerQuery("e = foreach c generate group, COUNT(a);"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("store e into '/tmp/output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryJiraPig976_4() { + + // test case: group by multi-cols and key ('group') isn't part of output + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = group a by uid;"); + myPig.registerQuery("c = group a by (uname, gid);"); + myPig.registerQuery("d = foreach b generate SUM(a.gid);"); + myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("store e into '/tmp/output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @Test + public void testMultiQueryJiraPig976_5() { + + // test case: key ('group') in multiple positions. + + try { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = group a by uid;"); + myPig.registerQuery("c = group a by (uname, gid);"); + myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group;"); + myPig.registerQuery("d1 = foreach d generate $1 + $2;"); + myPig.registerQuery("e = foreach c generate group, COUNT(a);"); + myPig.registerQuery("store d1 into '/tmp/output1';"); + myPig.registerQuery("store e into '/tmp/output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } @Test + public void testMultiQueryJiraPig976_6() { + + // test case: key ('group') has null values. + + String INPUT_FILE = "pig-976.txt"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("apple\tapple\t100\t10"); + w.println("apple\tapple\t\t20"); + w.println("orange\torange\t100\t10"); + w.println("orange\torange\t\t20"); + w.println("strawberry\tstrawberry\t300\t10"); + + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("a = load '" + INPUT_FILE + + "' as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b = group a by uid;"); + myPig.registerQuery("c = group a by gid;"); + myPig.registerQuery("d = foreach b generate group, SUM(a.gid);"); + myPig.registerQuery("e = foreach c generate COUNT(a), group;"); + myPig.registerQuery("store d into '/tmp/output1';"); + myPig.registerQuery("store e into '/tmp/output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + assertTrue(jobs.size() == 2); + + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + } + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + + } + } + + @Test public void testMultiQueryWithTwoStores2() { System.out.println("===== multi-query with 2 stores (2) =====");