Author: olga
Date: Wed Dec 24 10:11:58 2008
New Revision: 729346

URL: http://svn.apache.org/viewvc?rev=729346&view=rev
Log:
missing files for PIG-563

Added:
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java

Added: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java?rev=729346&view=auto
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
 (added)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
 Wed Dec 24 10:11:58 2008
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+/**
+ * The package operator that packages the globally rearranged tuples into
+ * output format after the combiner stage.  It differs from POPackage in that
+ * it does not use the index in the NullableTuple to find the bag to put a
+ * tuple in.  Intead, the inputs are put in a bag corresponding to their 
+ * offset in the tuple.
+ */
+public class POCombinerPackage extends POPackage {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    private final Log log = LogFactory.getLog(getClass());
+
+    private static BagFactory mBagFactory = BagFactory.getInstance();
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    private boolean[] mBags; // For each field, indicates whether or not it
+                             // needs to be put in a bag.
+
+    /**
+     * A new POPostCombinePackage will be constructed as a near clone of the
+     * provided POPackage.
+     * @param pkg POPackage to clone.
+     * @param bags for each field, indicates whether it should be a bag (true)
+     * or a simple field (false).
+     */
+    public POCombinerPackage(POPackage pkg, boolean[] bags) {
+        super(new OperatorKey(pkg.getOperatorKey().scope,
+            
NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)),
+            pkg.getRequestedParallelism(), pkg.getInputs());
+        resultType = pkg.getResultType();
+        keyType = pkg.keyType;
+        numInputs = 1;
+        inner = new boolean[1];
+        for (int i = 0; i < pkg.inner.length; i++) {
+            inner[i] = true;
+        }
+        mBags = bags;
+    }
+
+    @Override
+    public String name() {
+        return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) 
+ "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        int keyField = -1;
+        //Create numInputs bags
+        Object[] fields = new Object[mBags.length];
+        for (int i = 0; i < mBags.length; i++) {
+            if (mBags[i]) fields[i] = mBagFactory.newDefaultBag();
+        }
+        
+        // For each indexed tup in the inp, split them up and place their
+        // fields into the proper bags.  If the given field isn't a bag, just
+        // set the value as is.
+        while (tupIter.hasNext()) {
+            NullableTuple ntup = tupIter.next();
+            Tuple tup = (Tuple)ntup.getValueAsPigType();
+            // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+            // group case and not in cogroups. So there should only
+            // be one LocalRearrange from which we get the keyInfo for
+            // which field in the value is in the key. This LocalRearrange
+            // has an index of -1. When we do support combiner in Cogroups
+            // THIS WILL NEED TO BE REVISITED.
+            Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+                keyInfo.get(0); // assumption: only group are "combinable", 
hence index 0
+            Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+            int tupIndex = 0; // an index for accessing elements from 
+                              // the value (tup) that we have currently
+            for(int i = 0; i < mBags.length; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if(keyIndex == null) {
+                    // the field for this index is not the
+                    // key - so just take it from the "value"
+                    // we were handed - Currently THIS HAS TO BE A BAG
+                    // In future if this changes, THIS WILL NEED TO BE
+                    // REVISITED.
+                    ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
+                    tupIndex++;
+                } else {
+                    // the field for this index is in the key
+                    fields[i] = key;
+                }
+            }
+        }
+        
+        // The successor of the POCombinerPackage as of 
+        // now SHOULD be a POForeach which has been adjusted
+        // to look for its inputs by projecting from the corresponding
+               // positions in the POCombinerPackage output.
+               // So we will NOT be adding the key in the result here but 
merely 
+        // putting all bags into a result tuple and returning it. 
+        Tuple res;
+        res = mTupleFactory.newTuple(mBags.length);
+        for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+
+    }
+
+}

Added: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=729346&view=auto
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
 (added)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
 Wed Dec 24 10:11:58 2008
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A specialized local rearrange operator which behaves
+ * like the regular local rearrange in the getNext() 
+ * as far as getting its input and constructing the 
+ * "key" out of the input. It then returns a tuple with
+ * two fields - the key in the first position and the
+ * "value" inside a bag in the second position. This output
+ * format resembles the format out of a Package. This output
+ * will feed to a foreach which expects this format.  
+ */
+public class POPreCombinerLocalRearrange extends PhysicalOperator {
+
+    protected static final long serialVersionUID = 1L;
+
+    protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
+    protected static BagFactory mBagFactory = BagFactory.getInstance();
+
+    protected List<PhysicalPlan> plans;
+    
+    protected List<ExpressionOperator> leafOps;
+
+    protected byte keyType;
+
+    public POPreCombinerLocalRearrange(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POPreCombinerLocalRearrange(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public POPreCombinerLocalRearrange(OperatorKey k, List<PhysicalOperator> 
inp) {
+        this(k, -1, inp);
+    }
+
+    public POPreCombinerLocalRearrange(OperatorKey k, int rp, 
List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        leafOps = new ArrayList<ExpressionOperator>();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitPreCombinerLocalRearrange(this);
+    }
+
+    @Override
+    public String name() {
+        return "Pre Combiner Local Rearrange" + "[" + 
DataType.findTypeName(resultType) +
+            "]" + "{" + DataType.findTypeName(keyType) + "} - " + 
mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    /**
+     * Overridden since the attachment of the new input should cause the old
+     * processing to end.
+     */
+    @Override
+    public void attachInput(Tuple t) {
+        super.attachInput(t);
+    }
+    
+    /**
+     * Calls getNext on the generate operator inside the nested
+     * physical plan. Converts the generated tuple into the proper
+     * format, i.e, (key,indexedTuple(value))
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        
+        Result inp = null;
+        Result res = null;
+        while (true) {
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == 
POStatus.STATUS_ERR)
+                break;
+            if (inp.returnStatus == POStatus.STATUS_NULL)
+                continue;
+            
+            for (PhysicalPlan ep : plans) {
+                ep.attachInput((Tuple)inp.result);
+            }
+            List<Result> resLst = new ArrayList<Result>();
+            for (ExpressionOperator op : leafOps){
+                
+                switch(op.getResultType()){
+                case DataType.BAG:
+                    res = op.getNext(dummyBag);
+                    break;
+                case DataType.BOOLEAN:
+                    res = op.getNext(dummyBool);
+                    break;
+                case DataType.BYTEARRAY:
+                    res = op.getNext(dummyDBA);
+                    break;
+                case DataType.CHARARRAY:
+                    res = op.getNext(dummyString);
+                    break;
+                case DataType.DOUBLE:
+                    res = op.getNext(dummyDouble);
+                    break;
+                case DataType.FLOAT:
+                    res = op.getNext(dummyFloat);
+                    break;
+                case DataType.INTEGER:
+                    res = op.getNext(dummyInt);
+                    break;
+                case DataType.LONG:
+                    res = op.getNext(dummyLong);
+                    break;
+                case DataType.MAP:
+                    res = op.getNext(dummyMap);
+                    break;
+                case DataType.TUPLE:
+                    res = op.getNext(dummyTuple);
+                    break;
+                }
+                if(res.returnStatus!=POStatus.STATUS_OK)
+                    return new Result();
+                resLst.add(res);
+            }
+            res.result = constructLROutput(resLst,(Tuple)inp.result);
+            
+            return res;
+        }
+        return inp;
+    }
+    
+    protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws 
ExecException{
+        //Construct key
+        Object key;
+        if(resLst.size()>1){
+            Tuple t = mTupleFactory.newTuple(resLst.size());
+            int i=-1;
+            for(Result res : resLst)
+                t.set(++i, res.result);
+            key = t;
+        }
+        else{
+            key = resLst.get(0).result;
+        }
+        
+        Tuple output = mTupleFactory.newTuple(2);
+        output.set(0, key);
+        // put the value in a bag so that the initial
+        // version of the Algebraics will get a bag as
+        // they would expect.
+        DataBag bg = mBagFactory.newDefaultBag();
+        bg.add(value);
+        output.set(1, bg);
+        return output;
+    }
+
+    public byte getKeyType() {
+        return keyType;
+    }
+
+    public void setKeyType(byte keyType) {
+        this.keyType = keyType;
+    }
+
+    public List<PhysicalPlan> getPlans() {
+        return plans;
+    }
+
+    public void setPlans(List<PhysicalPlan> plans) {
+        this.plans = plans;
+        leafOps.clear();
+        for (PhysicalPlan plan : plans) {
+            ExpressionOperator leaf = 
(ExpressionOperator)plan.getLeaves().get(0); 
+            leafOps.add(leaf);
+        }            
+    }
+
+}


Reply via email to