Author: olga
Date: Tue Jan  5 19:37:56 2010
New Revision: 896188

URL: http://svn.apache.org/viewvc?rev=896188&view=rev
Log:
PIG-1171: Top-N queries produce incorrect results when followed by a cross 
statement (rding via olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jan  5 19:37:56 2010
@@ -74,6 +74,8 @@
 
 BUG FIXES
 
+PIG-1171: Top-N queries produce incorrect results when followed by a cross 
statement (rding via olgan)
+
 PIG-1159: merge join right side table does not support comma seperated paths
 (rding via olgan)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Tue Jan  5 19:37:56 2010
@@ -575,17 +575,27 @@
                     jobConf.set("pig.quantilesFile", mro.getQuantFile());
                     
jobConf.setPartitionerClass(WeightedRangePartitioner.class);
                 }
-                if(mro.UDFs.size()==1){
-                    String compFuncSpec = mro.UDFs.get(0);
-                    Class comparator = 
PigContext.resolveClassName(compFuncSpec);
-                    if(ComparisonFunc.class.isAssignableFrom(comparator)) {
-                        
jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
-                        
jobConf.setReducerClass(PigMapReduce.ReduceWithComparator.class);
-                        jobConf.set("pig.reduce.package", 
ObjectSerializer.serialize(pack));
-                        jobConf.set("pig.usercomparator", "true");
-                        jobConf.setOutputKeyClass(NullableTuple.class);
-                        jobConf.setOutputKeyComparatorClass(comparator);
+                
+                if (mro.isUDFComparatorUsed) {  
+                    boolean usercomparator = false;
+                    for (String compFuncSpec : mro.UDFs) {
+                        Class comparator = 
PigContext.resolveClassName(compFuncSpec);
+                        if(ComparisonFunc.class.isAssignableFrom(comparator)) {
+                            
jobConf.setMapperClass(PigMapReduce.MapWithComparator.class);
+                            
jobConf.setReducerClass(PigMapReduce.ReduceWithComparator.class);
+                            jobConf.set("pig.reduce.package", 
ObjectSerializer.serialize(pack));
+                            jobConf.set("pig.usercomparator", "true");
+                            jobConf.setOutputKeyClass(NullableTuple.class);
+                            jobConf.setOutputKeyComparatorClass(comparator);
+                            usercomparator = true;
+                            break;
+                        }
                     }
+                    if (!usercomparator) {
+                        String msg = "Internal error. Can't find the UDF 
comparator";
+                        throw new IOException (msg);
+                    }
+                    
                 } else {
                     jobConf.set("pig.sortOrder",
                         ObjectSerializer.serialize(mro.getSortOrder()));

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Tue Jan  5 19:37:56 2010
@@ -1568,6 +1568,7 @@
             
             if(op.isUDFComparatorUsed){
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
+                curMROp.isUDFComparatorUsed = true;
             }
             phyToMROpMap.put(op, curMROp);
         }catch(Exception e){
@@ -1895,6 +1896,7 @@
        
         if(sort.isUDFComparatorUsed) {
             mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString());
+            curMROp.isUDFComparatorUsed = true;
         }        
     
         List<Boolean> flat1 = new ArrayList<Boolean>();         

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Tue Jan  5 19:37:56 2010
@@ -107,6 +107,9 @@
 
     public List<String> UDFs;
     
+    // Indicates if a UDF comparator is used
+    boolean isUDFComparatorUsed = false;
+    
     transient NodeIdGenerator nig;
 
     private String scope;
@@ -133,7 +136,7 @@
     // Name of the partition file generated by sampling process,
     // Used by Skewed Join
        private String skewedJoinPartitionFile;
-
+       
     public MapReduceOper(OperatorKey k) {
         super(k);
         mapPlan = new PhysicalPlan();

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=896188&r1=896187&r2=896188&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Tue Jan  5 
19:37:56 2010
@@ -87,6 +87,63 @@
     public void tearDown() throws Exception {
         myPig = null;
     }
+
+    public void testMultiQueryJiraPig1171() {
+
+        // test case: Problems with some top N queries
+        
+        String INPUT_FILE = "abc";
+        
+        try {
+    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("1\tapple\t3");
+            w.println("2\torange\t4");
+            w.println("3\tpersimmon\t5");
+            w.close();
+    
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+           
+            myPig.setBatchOn();
+    
+            myPig.registerQuery("A = load '" + INPUT_FILE 
+                    + "' as (a:long, b, c);");
+            myPig.registerQuery("A1 = Order A by a desc;");
+            myPig.registerQuery("A2 = limit A1 1;");
+            myPig.registerQuery("B = load '" + INPUT_FILE 
+                    + "' as (a:long, b, c);");
+            myPig.registerQuery("B1 = Order B by a desc;");
+            myPig.registerQuery("B2 = limit B1 1;");
+            
+            myPig.registerQuery("C = cross A2, B2;");
+            
+            Iterator<Tuple> iter = myPig.openIterator("C");
+
+            List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(3L,'persimmon',5,3L,'persimmon',5)"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());      
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
     
     public void testMultiQueryJiraPig1157() {
 


Reply via email to