Author: pisong
Date: Tue Jul 22 05:11:17 2008
New Revision: 678729

URL: http://svn.apache.org/viewvc?rev=678729&view=rev
Log:
PIG-319 Modified POUnion to allow injected input

Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java
Modified:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java?rev=678729&r1=678728&r2=678729&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
 Tue Jul 22 05:11:17 2008
@@ -33,6 +33,11 @@
         returnStatus = POStatus.STATUS_ERR;
         result = null;
     }
+    
+    public Result(byte returnStatus, Object result) {
+        this.returnStatus = returnStatus ;
+        this.result = result;
+    }
 
     @Override
     public String toString() {

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=678729&r1=678728&r2=678729&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
 Tue Jul 22 05:11:17 2008
@@ -50,6 +50,9 @@
     //Used for efficiently shifting between non-drained
     //inputs
     BitSet done;
+
+    boolean nextReturnEOP = false ;
+    private static Result eopResult = new Result(POStatus.STATUS_EOP, null) ;
     
     //The index of the last input that was read
     int lastInd = 0;
@@ -73,7 +76,12 @@
     @Override
     public void setInputs(List<PhysicalOperator> inputs) {
         super.setInputs(inputs);
-        done = new BitSet(inputs.size());
+        if (inputs != null) {
+            done = new BitSet(inputs.size());
+        }
+        else {
+            done = new BitSet(0) ;
+        }
     }
 
     @Override
@@ -107,37 +115,55 @@
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        while(true){
-            if (done.nextClearBit(0) >= inputs.size()) {
-                res = new Result();
-                res.returnStatus = POStatus.STATUS_EOP;
-                clearDone();
-                return res;
-            }
-            if(lastInd >= inputs.size() || done.nextClearBit(lastInd) >= 
inputs.size())
-                lastInd = 0;
-            int ind = done.nextClearBit(lastInd);
-            Result res;
-            
+
+        if (nextReturnEOP) {
+            nextReturnEOP = false ;
+            return eopResult ;
+        }
+
+        // Case 1 : Normal connected plan
+        if (!isInputAttached()) {
+
             while(true){
-                if(reporter!=null) reporter.progress();
-                res = inputs.get(ind).getNext(t);
-                if(res.returnStatus == POStatus.STATUS_NULL)
-                    continue;
-                
-                lastInd = ind + 1;
-                
-                if(res.returnStatus == POStatus.STATUS_ERR)
-                    return new Result();
-                
-                if (res.returnStatus == POStatus.STATUS_OK)
-                    return res;
-                
-                if (res.returnStatus == POStatus.STATUS_EOP) {
-                    done.set(ind);
-                    break;
+                if (done.nextClearBit(0) >= inputs.size()) {
+                    clearDone();
+                    return eopResult ;
+                }
+                if(lastInd >= inputs.size() || done.nextClearBit(lastInd) >= 
inputs.size())
+                    lastInd = 0;
+                int ind = done.nextClearBit(lastInd);
+                Result res;
+
+                while(true){
+                    if(reporter!=null) reporter.progress();
+                    res = inputs.get(ind).getNext(t);
+                    if(res.returnStatus == POStatus.STATUS_NULL)
+                        continue;
+
+                    lastInd = ind + 1;
+
+                    if(res.returnStatus == POStatus.STATUS_ERR)
+                        return new Result();
+
+                    if (res.returnStatus == POStatus.STATUS_OK)
+                        return res;
+                    
+                    if (res.returnStatus == POStatus.STATUS_EOP) {
+                        done.set(ind);
+                        break;
+                    }
                 }
             }
         }
+        // Case 2 : Input directly injected
+        else {
+            res.result = input;
+            res.returnStatus = POStatus.STATUS_OK;
+            detachInput();
+            nextReturnEOP = true ;
+            return res;
+        }
+
+
     }
 }

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java?rev=678729&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java 
(added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce2.java 
Tue Jul 22 05:11:17 2008
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Iterator;
+
+public class TestMapReduce2 extends TestCase {
+
+    private String initString = "mapreduce";
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+    private PigServer pig ;
+
+    public TestMapReduce2() throws Throwable {
+        pig = new PigServer(initString) ;
+    }
+
+
+    @Test
+    public void testUnion1() throws Exception {
+        File tmpFile1 = genDataSetFile1() ;
+        File tmpFile2 = genDataSetFile2() ;
+        pig.registerQuery("a = load 'file:" + tmpFile1 + "'; ") ;
+        pig.registerQuery("b = load 'file:" + tmpFile2 + "'; ") ;
+        pig.registerQuery("c = union a, b; ") ;
+
+        Iterator<Tuple> it = pig.openIterator("c");
+        Tuple t = null ;
+        int count = 0 ;
+        while(it.hasNext()) {
+            t = it.next() ;
+            System.out.println(count + ":" + t) ;
+            count++ ;
+        }
+        Assert.assertEquals(count, 30 + 50);
+    }
+
+    @Test
+    public void testUnion2() throws Exception {
+        File tmpFile1 = genDataSetFile1() ;
+        File tmpFile2 = genDataSetFile2() ;
+        pig.registerQuery("a = load 'file:" + tmpFile1 + "'; ") ;
+        pig.registerQuery("b = load 'file:" + tmpFile2 + "'; ") ;
+        pig.registerQuery("a1 = foreach a generate $0, $1; ") ;
+        pig.registerQuery("b1 = foreach b generate $0, $1; ") ;
+        pig.registerQuery("c = union a1, b1; ") ;
+
+        Iterator<Tuple> it = pig.openIterator("c");
+        Tuple t = null ;
+        int count = 0 ;
+        while(it.hasNext()) {
+            t = it.next() ;
+            System.out.println(count + ":" + t) ;
+            count++ ;
+        }
+        Assert.assertEquals(count, 30 + 50);
+    }
+
+    /***
+     * For generating a sample dataset
+     */
+    private File genDataSetFile1() throws IOException {
+
+        int dataLength = 30;
+        String[][] data = new String[dataLength][] ;
+
+        DecimalFormat formatter = new DecimalFormat("0000000");
+
+        for (int i = 0; i < dataLength; i++) {
+            data[i] = new String[2] ;
+            data[i][0] = formatter.format(i % 10);
+            data[i][1] = formatter.format(dataLength - i);
+        }
+
+        return TestHelper.createTempFile(data) ;
+    }
+
+    private File genDataSetFile2() throws IOException {
+
+        int dataLength = 50;
+        String[][] data = new String[dataLength][] ;
+
+        DecimalFormat formatter = new DecimalFormat("0000000");
+
+        for (int i = 0; i < dataLength; i++) {
+            data[i] = new String[2] ;
+            data[i][0] = formatter.format(i % 10);
+            data[i][1] = formatter.format(dataLength - i);
+        }
+
+        return TestHelper.createTempFile(data) ;
+    }
+}


Reply via email to