Author: rding
Date: Thu Jan 28 18:27:07 2010
New Revision: 904202

URL: http://svn.apache.org/viewvc?rev=904202&view=rev
Log:
PIG-1204: Pig hangs when joining two streaming relations in local mode

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=904202&r1=904201&r2=904202&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 28 18:27:07 2010
@@ -78,6 +78,9 @@
 
 BUG FIXES
 
+PIG-1204:  Pig hangs when joining two streaming relations in local mode
+(rding)
+
 PIG-1191:  POCast throws exception for certain sequences of LOAD, FILTER,
                        FORACH (pradeepkth via gates)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=904202&r1=904201&r2=904202&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 Thu Jan 28 18:27:07 2010
@@ -40,6 +40,8 @@
 
 public class POStream extends PhysicalOperator {
     private static final long serialVersionUID = 2L;
+    
+    private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, 
null);
 
     private String executableManagerStr;            // String representing 
ExecutableManager to use
     transient private ExecutableManager executableManager;    // 
ExecutableManager to use 
@@ -155,7 +157,7 @@
                     // getNext() in POStream should never be called. So
                     // we don't need to set any flag noting we saw all output
                     // from binary
-                    r.returnStatus = POStatus.STATUS_EOP;
+                    r = EOP_RESULT;
                 }
                 return(r);
             }
@@ -190,7 +192,7 @@
                             // getNext() in POStream should never be called. So
                             // we don't need to set any flag noting we saw all 
output
                             // from binary
-                            r.returnStatus = POStatus.STATUS_EOP;
+                            r = EOP_RESULT;
                         }
                     }
                     
@@ -204,7 +206,7 @@
                     // So once we send this EOP down, getNext() in POStream
                     // should never be called. So we don't need to set any 
                     // flag noting we saw all output from binary
-                    r.returnStatus = POStatus.STATUS_EOP;
+                    r = EOP_RESULT;
                 }
                 return r;
             } else {
@@ -218,7 +220,7 @@
                     // So we can send an EOP to the successor in
                     // the pipeline and also note this condition
                     // for future calls
-                    r.returnStatus = POStatus.STATUS_EOP;
+                    r = EOP_RESULT;
                     allOutputFromBinaryProcessed  = true;
                 }
                 return r;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java?rev=904202&r1=904201&r2=904202&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestStreamingLocal.java Thu Jan 
28 18:27:07 2010
@@ -18,6 +18,8 @@
 package org.apache.pig.test;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
 
 import junit.framework.TestCase;
 
@@ -285,6 +287,39 @@
             Util.checkQueryOutputs(pigServer.openIterator("OP"), 
expectedResults);
         }
     }
+    
+    @Test
+    public void testJoinTwoStreamingRelations() 
+    throws Exception {
+        ArrayList<String> list = new ArrayList<String>();
+        for (int i=0; i<10000; i++) {
+            list.add("A," + i);
+        }
+        File input = Util.createInputFile("tmp", "", list.toArray(new 
String[0]));
+        
+        // Expected results
+        Tuple expected = DefaultTupleFactory.getInstance().newTuple(4);
+        expected.set(0, "A");
+        expected.set(1, 0);
+        expected.set(2, "A");
+        expected.set(3, 0);        
+ 
+        pigServer.registerQuery("A = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                    PigStorage.class.getName() + "(',') as (a0, a1);");
+        pigServer.registerQuery("B = stream A through `head -1` as (a0, a1);");
+        pigServer.registerQuery("C = load 'file:" + 
Util.encodeEscape(input.toString()) + "' using " + 
+                PigStorage.class.getName() + "(',') as (a0, a1);");
+        pigServer.registerQuery("D = stream C through `head -1` as (a0, a1);");
+        pigServer.registerQuery("E = join B by a0, D by a0;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        int count = 0;
+        while (iter.hasNext()) {
+            Assert.assertEquals(expected.toString(), iter.next().toString());
+            count++;
+        }
+        Assert.assertTrue(count == 1);
+    }
 
     @Test
     public void testLocalNegativeLoadStoreOptimization() throws Exception {


Reply via email to