Author: gates
Date: Mon Jul 28 18:27:14 2008
New Revision: 680594

URL: http://svn.apache.org/viewvc?rev=680594&view=rev
Log:
PIG-344 Fix sorting to work on types other than just byte array.


Modified:
    
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java

Modified: 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=680594&r1=680593&r2=680594&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Mon Jul 28 18:27:14 2008
@@ -819,7 +819,7 @@
             int rp = op.getRequestedParallelism();
             int[] fields = getSortCols(op);
             MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, 
rp, fields);
-            curMROp = getSortJob(quant, fSpec, quantFile, rp, fields, 
op.getLimit());
+            curMROp = getSortJob(op, quant, fSpec, quantFile, rp, fields);
             if(op.isUDFComparatorUsed){
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
             }
@@ -846,18 +846,22 @@
     }
     
     public MapReduceOper getSortJob(
+            POSort sort,
             MapReduceOper quantJob,
             FileSpec lFile,
             FileSpec quantFile,
             int rp,
-            int[] fields,
-            long limit) throws PlanException{
+            int[] fields) throws PlanException{
         MapReduceOper mro = startNew(lFile, quantJob);
         mro.setQuantFile(quantFile.getFileName());
         mro.setGlobalSort(true);
         mro.requestedParallelism = rp;
+
+        long limit = sort.getLimit();
         
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
+
+        byte keyType = DataType.UNKNOWN;
         
         if (fields == null) {
             // This is project *
@@ -869,6 +873,7 @@
             ep.add(prj);
             eps1.add(ep);
         } else {
+            /*
             for (int i : fields) {
                 PhysicalPlan ep = new PhysicalPlan();
                 POProject prj = new POProject(new OperatorKey(scope,
@@ -879,21 +884,37 @@
                 ep.add(prj);
                 eps1.add(ep);
             }
+            */
+            // Attach the sort plans to the local rearrange to get the
+            // projection.
+            eps1.addAll(sort.getSortPlans());
+
+            // Visit the first sort plan to figure out our key type.  We only
+            // have to visit the first because if we have more than one plan,
+            // then the key type will be tuple.
+            try {
+                FindKeyTypeVisitor fktv =
+                    new FindKeyTypeVisitor(sort.getSortPlans().get(0));
+                fktv.visit();
+                keyType = fktv.keyType;
+            } catch (VisitorException ve) {
+                throw new PlanException(ve);
+            }
         }
         
         POLocalRearrange lr = new POLocalRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
         lr.setIndex(0);
-        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : 
DataType.BYTEARRAY);
+        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+            keyType);
         lr.setPlans(eps1);
         lr.setResultType(DataType.TUPLE);
         mro.mapPlan.addAsLeaf(lr);
         
         mro.setMapDone(true);
         
-        if (limit!=-1)
-        {
+        if (limit!=-1) {
                POPackage pkg_c = new POPackage(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-               pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : 
DataType.BYTEARRAY);
+               pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
             pkg_c.setNumInps(1);
             //pkg.setResultType(DataType.TUPLE);
             boolean[] inner = {false};
@@ -921,6 +942,8 @@
                mro.combinePlan.addAsLeaf(pLimit);
             
             List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
+            eps_c2.addAll(sort.getSortPlans());
+            /*
             for (int i : fields) {
                    PhysicalPlan ep_c2 = new PhysicalPlan();
                    POProject prj_c2 = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -930,17 +953,19 @@
                    ep_c2.add(prj_c2);
                    eps_c2.add(ep_c2);
                }
+            */
         
                POLocalRearrange lr_c2 = new POLocalRearrange(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
                lr_c2.setIndex(0);
-               lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : 
DataType.BYTEARRAY);
+               lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
                lr_c2.setPlans(eps_c2);
                lr_c2.setResultType(DataType.TUPLE);
                mro.combinePlan.addAsLeaf(lr_c2);
         }
         
         POPackage pkg = new POPackage(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : 
DataType.BYTEARRAY);
+        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+            keyType);
         pkg.setNumInps(1);
         boolean[] inner = {false}; 
         pkg.setInner(inner);
@@ -1186,5 +1211,20 @@
             }
         }
     }
+
+    private class FindKeyTypeVisitor extends PhyPlanVisitor {
+
+        byte keyType = DataType.UNKNOWN;
+
+        FindKeyTypeVisitor(PhysicalPlan plan) {
+            super(plan,
+                new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+
+        @Override
+        public void visitProject(POProject p) throws VisitorException {
+            keyType = p.getResultType();
+        }
+    }
     
 }


Reply via email to