Author: rding Date: Thu Jan 28 18:27:07 2010 New Revision: 904202 URL: http://svn.apache.org/viewvc?rev=904202&view=rev Log: PIG-1204: Pig hangs when joining two streaming relations in local mode
Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=904202&r1=904201&r2=904202&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Thu Jan 28 18:27:07 2010 @@ -78,6 +78,9 @@ BUG FIXES +PIG-1204: Pig hangs when joining two streaming relations in local mode +(rding) + PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER, FORACH (pradeepkth via gates) Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=904202&r1=904201&r2=904202&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Jan 28 18:27:07 2010 @@ -40,6 +40,8 @@ public class POStream extends PhysicalOperator { private static final long serialVersionUID = 2L; + + private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null); private String executableManagerStr; // String representing ExecutableManager to use transient private ExecutableManager executableManager; // ExecutableManager to use @@ -155,7 +157,7 @@ // getNext() in POStream should never be called. So // we don't need to set any flag noting we saw all output // from binary - r.returnStatus = POStatus.STATUS_EOP; + r = EOP_RESULT; } return(r); } @@ -190,7 +192,7 @@ // getNext() in POStream should never be called. So // we don't need to set any flag noting we saw all output // from binary - r.returnStatus = POStatus.STATUS_EOP; + r = EOP_RESULT; } } @@ -204,7 +206,7 @@ // So once we send this EOP down, getNext() in POStream // should never be called. So we don't need to set any // flag noting we saw all output from binary - r.returnStatus = POStatus.STATUS_EOP; + r = EOP_RESULT; } return r; } else { @@ -218,7 +220,7 @@ // So we can send an EOP to the successor in // the pipeline and also note this condition // for future calls - r.returnStatus = POStatus.STATUS_EOP; + r = EOP_RESULT; allOutputFromBinaryProcessed = true; } return r; Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java?rev=904202&r1=904201&r2=904202&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Thu Jan 28 18:27:07 2010 @@ -18,6 +18,8 @@ package org.apache.pig.test; import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; import junit.framework.TestCase; @@ -285,6 +287,39 @@ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); } } + + @Test + public void testJoinTwoStreamingRelations() + throws Exception { + ArrayList<String> list = new ArrayList<String>(); + for (int i=0; i<10000; i++) { + list.add("A," + i); + } + File input = Util.createInputFile("tmp", "", list.toArray(new String[0])); + + // Expected results + Tuple expected = DefaultTupleFactory.getInstance().newTuple(4); + expected.set(0, "A"); + expected.set(1, 0); + expected.set(2, "A"); + expected.set(3, 0); + + pigServer.registerQuery("A = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',') as (a0, a1);"); + pigServer.registerQuery("B = stream A through `head -1` as (a0, a1);"); + pigServer.registerQuery("C = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + + PigStorage.class.getName() + "(',') as (a0, a1);"); + pigServer.registerQuery("D = stream C through `head -1` as (a0, a1);"); + pigServer.registerQuery("E = join B by a0, D by a0;"); + + Iterator<Tuple> iter = pigServer.openIterator("E"); + int count = 0; + while (iter.hasNext()) { + Assert.assertEquals(expected.toString(), iter.next().toString()); + count++; + } + Assert.assertTrue(count == 1); + } @Test public void testLocalNegativeLoadStoreOptimization() throws Exception {