Author: gates
Date: Mon Feb 25 13:15:25 2008
New Revision: 630997

URL: http://svn.apache.org/viewvc?rev=630997&view=rev
Log:
PIG-110: Replaced code accidently merged out in PIG-32 fix that handled 
flattening the combiner case.


Added:
    incubator/pig/trunk/test/org/apache/pig/test/TestCombiner.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=630997&r1=630996&r2=630997&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Mon Feb 25 13:15:25 2008
@@ -127,3 +127,6 @@
        be run w/o access to a hadoop cluster. (xuzh via gates)
 
     PIG-68: improvements to build.xml (joa23 via olgan)
+
+       PIG-110: Replaced code accidently merged out in PIG-32 fix that handled
+       flattening the combiner case. (gates and oae)

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=630997&r1=630996&r2=630997&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
 Mon Feb 25 13:15:25 2008
@@ -498,11 +498,23 @@
     private class CombineAdjuster extends EvalSpecVisitor {
         private int position = 0;
 
+        //We don't want to be performing any flattening in the combiner since 
the column numbers in
+        //the reduce spec assume that there is no combiner. If the combiner 
performs flattening, the column
+        //numbers get messed up. For now, since combiner works only with 
generate group, func1(), func2(),...,
+        //it suffices to write visitors for those eval spec types.
+
         public void visitFuncEval(FuncEvalSpec fe) {
             // Reset the function to call the initial instance of itself
             // instead of the general instance.
             fe.resetFuncToInitial();
+            fe.setFlatten(false);
         }
+
+        @Override
+        public void visitProject(ProjectSpec p) {
+            p.setFlatten(false);
+        }
+
     }
 
     private class CombineDeterminer extends EvalSpecVisitor {

Added: incubator/pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=630997&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestCombiner.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestCombiner.java Mon Feb 25 
13:15:25 2008
@@ -0,0 +1,63 @@
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Test;
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+
+public class TestCombiner extends TestCase {
+
+    @Test
+    public void testLocal() throws Exception {
+        // run the test locally
+        runTest(new PigServer("local"));
+    }
+
+    @Test
+    public void testOnCluster() throws Exception {
+        // run the test on cluster
+        MiniCluster.buildCluster();
+        runTest(new PigServer("mapreduce"));
+
+    }
+
+    private void runTest(PigServer pig) throws IOException {
+        List<String> inputLines = new ArrayList<String>();
+        inputLines.add("a,b,1");
+        inputLines.add("a,b,1");
+        inputLines.add("a,c,1");
+        loadWithTestLoadFunc("A", pig, inputLines);
+
+        pig.registerQuery("B = group A by ($0, $1);");
+        pig.registerQuery("C = foreach B generate flatten(group), COUNT($1);");
+        Iterator<Tuple> resultIterator = pig.openIterator("C");
+        Tuple tuple = resultIterator.next();
+        assertEquals("(a, b, 2)", tuple.toString());
+        tuple = resultIterator.next();
+        assertEquals("(a, c, 1)", tuple.toString());
+    }
+
+    private void loadWithTestLoadFunc(String loadAlias, PigServer pig,
+            List<String> inputLines) throws IOException {
+        File inputFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(inputFile));
+        for (String line : inputLines) {
+            ps.println(line);
+        }
+        ps.close();
+        pig.registerQuery(loadAlias + " = load 'file:"
+                + inputFile + "' using "
+                + PigStorage.class.getName() + "(',');");
+    }
+
+}


Reply via email to