Author: olga
Date: Tue Jun 16 20:27:38 2009
New Revision: 785378

URL: http://svn.apache.org/viewvc?rev=785378&view=rev
Log:
PIG-849: Local engine loses records in splits (hagleitn via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=785378&r1=785377&r2=785378&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 16 20:27:38 2009
@@ -28,6 +28,11 @@
 
 BUG FIXES
 
+    PIG-852: pig -version or pig -help returns exit code of 1 (milindb via
+    olgan)
+
+    PIG-849: Local engine loses records in splits (hagleitn via olgan)
+
 Release 0.3.0 - Unreleased
 
 INCOMPATIBLE CHANGES
@@ -692,5 +697,3 @@
 
     PIG-284: target for building source jar (oae via olgan)
 
-    PIG-852: pig -version or pig -help returns exit code of 1 (milindb via
-    olgan)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=785378&r1=785377&r2=785378&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POSplit.java
 Tue Jun 16 20:27:38 2009
@@ -68,7 +68,10 @@
                if(input.returnStatus == POStatus.STATUS_ERR) {
                    throw new ExecException("Error accumulating output at local 
Split operator");
                }
-               data.add((Tuple) input.result);
+
+                if (input.returnStatus != POStatus.STATUS_NULL) {
+                    data.add((Tuple) input.result);
+                }
            }
            processingDone = true;
        }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java?rev=785378&r1=785377&r2=785378&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java Tue Jun 16 
20:27:38 2009
@@ -67,6 +67,8 @@
 
     @Test
     public void testSplit() throws IOException, VisitorException, 
ExecException {
+        init();
+
         pigContext.connect();
         File datFile = File.createTempFile("tempA", ".dat");
 
@@ -134,6 +136,76 @@
         }
     }
 
+    @Test
+    public void testSplitNulls() throws IOException, VisitorException, 
ExecException {
+        init();
+
+        pigContext.connect();
+        File datFile = File.createTempFile("tempN1", ".dat");
+        String path1 = Util.encodeEscape(datFile.getAbsolutePath());
+
+        FileOutputStream dat = new FileOutputStream(datFile);
+        String s1 = "1\n2\n3\n42\n4\n5\n";
+        dat.write(s1.getBytes());
+        dat.close();
+
+        String s2 = "1\n2\n43\n3\n4\n5\n";
+
+        datFile = File.createTempFile("tempN2", ".dat");
+        String path2 = Util.encodeEscape(datFile.getAbsolutePath());
+                
+        dat = new FileOutputStream(datFile);
+        dat.write(s2.getBytes());
+        dat.close();
+
+        String query = "a = load '"+path1+"'; b = load '"+path2+"'; "+
+            "c = cogroup a by $0, b by $0; d = foreach c generate $0, 
flatten($1), flatten($2); "+
+            "split d into e if 1==1, f if 1==1;";
+
+        LogicalPlan plan = buildPlan(query);
+        PhysicalPlan pp = buildPhysicalPlan(plan);
+
+        DataBag[] bag = new DataBag[pp.getLeaves().size()];
+
+        for (int i = 0; i < bag.length; i++) {
+            bag[i] = BagFactory.getInstance().newDefaultBag();
+        }
+
+        for (int i = 0; i < pp.getLeaves().size(); i++) {
+            System.out.println("Leaves: "+i);
+            Tuple t = null;
+            for (Result res = pp.getLeaves().get(i).getNext(t); 
res.returnStatus != POStatus.STATUS_EOP; res = pp
+                    .getLeaves().get(i).getNext(t)) {
+                if (res.returnStatus == POStatus.STATUS_OK)
+                    bag[i].add((Tuple) res.result);
+                System.out.println("Split: "+res.result);
+            }
+        }
+        
+        Map<DataByteArray, Integer> seen = new HashMap<DataByteArray, 
Integer>();
+        seen.put(new DataByteArray("1".getBytes()), new Integer(0));
+        seen.put(new DataByteArray("2".getBytes()), new Integer(0));
+        seen.put(new DataByteArray("3".getBytes()), new Integer(0));
+        seen.put(new DataByteArray("4".getBytes()), new Integer(0));
+        seen.put(new DataByteArray("5".getBytes()), new Integer(0));
+        
+        for (int i = 0; i < bag.length; i++) {
+            DataByteArray value = null;
+            Iterator<Tuple> it = bag[i].iterator();
+            while (it.hasNext()) {
+                Tuple t = it.next();
+                System.out.println("Value: "+t);
+                value = (DataByteArray) t.get(0);
+                Integer count = seen.get(value);
+                seen.put(value, ++count);
+            }
+        }
+
+        for (Integer j: seen.values()) {
+            assertEquals(j, new Integer(2));
+        }
+    }
+
     public PhysicalPlan buildPhysicalPlan(LogicalPlan lp)
             throws VisitorException {
         LocalLogToPhyTranslationVisitor visitor = new 
LocalLogToPhyTranslationVisitor(
@@ -153,8 +225,18 @@
         LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
 
         try {
-            LogicalPlan lp = builder.parse("Test-Plan-Builder", query, aliases,
-                                           logicalOpTable, aliasOp, 
fileNameMap);
+            String[] qs = query.split(";");
+            LogicalPlan lp = null;
+            for (String q: qs) {
+                q = q.trim();
+                if (q.equals(""))
+                    continue;
+                q += ";";
+                System.out.println(q);
+                lp = builder.parse("Test-Plan-Builder", q, aliases,
+                                   logicalOpTable, aliasOp, fileNameMap);
+            }
+
             List<LogicalOperator> roots = lp.getRoots();
 
             if (roots.size() > 0) {
@@ -238,8 +320,15 @@
 
     }
 
-    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, 
LogicalPlan>();
-    Map<OperatorKey, LogicalOperator> logicalOpTable = new 
HashMap<OperatorKey, LogicalOperator>();
-    Map<String, LogicalOperator> aliasOp = new HashMap<String, 
LogicalOperator>();
-    Map<String, String> fileNameMap = new HashMap<String, String>();
+    private void init() {
+        aliases = new HashMap<LogicalOperator, LogicalPlan>();
+        logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
+        aliasOp = new HashMap<String, LogicalOperator>();
+        fileNameMap = new HashMap<String, String>();
+    }
+
+    Map<LogicalOperator, LogicalPlan> aliases;
+    Map<OperatorKey, LogicalOperator> logicalOpTable;
+    Map<String, LogicalOperator> aliasOp;
+    Map<String, String> fileNameMap;
 }


Reply via email to