Author: olga
Date: Mon Jan 5 15:57:28 2009
New Revision: 731777
URL: http://svn.apache.org/viewvc?rev=731777&view=rev
Log:
PIG-580: use of combiner for distributed agg computations
Added:
hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Mon Jan 5 15:57:28 2009
@@ -344,4 +344,6 @@
PIG-522: make negation work (pradeepk via olgan)
- PIG-563: support for multiple combiner invocations
+ PIG-563: support for multiple combiner invocations (pradeepk via olgan)
+
+ PIG-580: using combiner to compute distinct aggs (pradeepk via olgan)
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
Mon Jan 5 15:57:28 2009
@@ -264,7 +264,7 @@
public void explain(PhysicalPlan plan, PrintStream stream) {
try {
- PlanPrinter printer = new PlanPrinter(plan);
+ PlanPrinter printer = new PlanPrinter(plan, stream);
printer.visit();
stream.println();
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
Mon Jan 5 15:57:28 2009
@@ -18,11 +18,13 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataType;
import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
@@ -43,10 +45,12 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Pair;
@@ -93,6 +97,8 @@
*/
public class CombinerOptimizer extends MROpPlanVisitor {
+ private static final String DISTINCT_UDF_CLASSNAME =
org.apache.pig.builtin.Distinct.class.getName();
+
private Log log = LogFactory.getLog(getClass());
private enum ExprType { SIMPLE_PROJECT, ALGEBRAIC, NOT_ALGEBRAIC,
@@ -340,7 +346,8 @@
for (int i = 0; i < plans.size(); i++) {
ExprType t = algebraic(plans.get(i), flattens.get(i), i);
types.add(t);
- atLeastOneAlgebraic |= (t == ExprType.ALGEBRAIC);
+ atLeastOneAlgebraic |=
+ (t == ExprType.ALGEBRAIC || t == ExprType.DISTINCT);
noNonAlgebraics &= (t != ExprType.NOT_ALGEBRAIC);
}
if (!atLeastOneAlgebraic || !noNonAlgebraics) return null;
@@ -368,7 +375,10 @@
AlgebraicPlanChecker apc = new AlgebraicPlanChecker(pp);
apc.visit();
if (apc.sawNonAlgebraic) return ExprType.NOT_ALGEBRAIC;
-
+ if(apc.sawDistinctAgg) return ExprType.DISTINCT;
+
+ // we did not see a Non algebraic or a distinct so far
+ // proceed to check leaves
PhysicalOperator leaf = leaves.get(0);
if (leaf instanceof POProject) {
POProject proj = (POProject)leaf;
@@ -414,6 +424,49 @@
changeFunc(mfe, mPlans.get(i), POUserFunc.INITIAL);
changeFunc(cfe, cPlans.get(i), POUserFunc.INTERMEDIATE);
changeFunc(rfe, rPlans.get(i), POUserFunc.FINAL);
+ } else if (exprs.get(i) == ExprType.DISTINCT) {
+ // A PODistinct in the plan will always have
+ // a Project[bag](*) as its successor.
+ // We will replace it with a POUserFunc with "Distinct" as
+ // the underlying UDF.
+ // In the map and combine, we will make this POUserFunc
+ // the leaf of the plan by removing other operators which
+ // are descendants up to the leaf.
+ // In the reduce we will keep descendants intact. Further
+ // down in fixProjectAndInputs we will change the inputs to
+ // this POUserFunc in the combine and reduce plans to be
+ // just projections of the column "i"
+ PhysicalPlan[] plans = new PhysicalPlan[] {
+ mPlans.get(i), cPlans.get(i), rPlans.get(i) };
+ byte[] funcTypes = new byte[] { POUserFunc.INITIAL,
+ POUserFunc.INTERMEDIATE, POUserFunc.FINAL };
+ for (int j = 0; j < plans.length; j++) {
+ DistinctPatcher dp = new DistinctPatcher(plans[j]);
+ try {
+ dp.visit();
+ } catch (VisitorException e) {
+ throw new PlanException(e);
+ }
+
+
+ PhysicalOperator leaf = plans[j].getLeaves().get(0);
+ // make the Distinct POUserFunc the leaf in the map and
combine plans.
+ if( j != plans.length - 1) {
+ while(!((leaf instanceof POUserFunc) &&
+
((POUserFunc)leaf).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME)))
{
+ plans[j].remove(leaf);
+ // get the new leaf
+ leaf = plans[j].getLeaves().get(0);
+ }
+
+ }
+ // Also set the Distinct's function to type Initial in map
+ // to type Intermediate in combine plan and to type Final
in
+ // the reduce
+ POUserFunc distinctFunc =
(POUserFunc)getDistinctUserFunc(plans[j], leaf);
+ distinctFunc.setAlgebraicFunction(funcTypes[j]);
+ }
+
}
}
@@ -439,44 +492,94 @@
// they are in ( we just want to take output from the combine and
// use that as input in the reduce/combine plan). UDFs will be left
the same but their
// inputs altered. Any straight projections will also be altered.
- fixProjectAndInputs(cPlans);
- fixProjectAndInputs(rPlans);
+ fixProjectAndInputs(cPlans, exprs);
+ fixProjectAndInputs(rPlans, exprs);
+
+
+ // we have modified the foreach inner plans - so set them
+ // again for the foreach so that foreach can do any re-initialization
+ // around them.
+ // FIXME - this is a necessary evil right now because the leaves are
explicitly
+ // stored in the POForeach as a list rather than computed each time at
+ // run time from the plans for optimization. Do we want to have the
Foreach
+ // compute the leaves each time and have Java optimize it (will Java
optimize?)?
+ mfe.setInputPlans(mPlans);
+ cfe.setInputPlans(cPlans);
+ rfe.setInputPlans(rPlans);
}
/**
* @param plans
+ * @param exprs
* @throws PlanException
*/
- private void fixProjectAndInputs(List<PhysicalPlan> plans) throws
PlanException {
+ private void fixProjectAndInputs(List<PhysicalPlan> plans, List<ExprType>
exprs) throws PlanException {
for (int i = 0; i < plans.size(); i++) {
- List<PhysicalOperator> leaves = plans.get(i).getLeaves();
- if (leaves == null || leaves.size() != 1) {
- throw new RuntimeException("Expected to find plan with single
leaf!");
- }
- PhysicalOperator leaf = leaves.get(0);
+ List<PhysicalOperator> leaves = plans.get(i).getLeaves();
+ if (leaves == null || leaves.size() != 1) {
+ throw new RuntimeException("Expected to find plan with
single leaf!");
+ }
+ PhysicalOperator leaf = leaves.get(0);
+ // the combine plan could have an extra foreach inner plan
+ // to project the key - so make sure we check the index
+ // before looking in exprs
+ if(i < exprs.size() && exprs.get(i) == ExprType.DISTINCT) {
+ // if there is a distinctagg, we have to
+ // look for the Distinct POUserFunc and
+ // change its input to be a project of
+ // column "i"
+ PhysicalOperator op = getDistinctUserFunc(plans.get(i),
leaf);
+ setProjectInput(op, plans.get(i), i);
+ } else {
+ // Leaf should be either a projection or a UDF
+ if (leaf instanceof POProject) {
+ ((POProject)leaf).setColumn(i);
+ } else if (leaf instanceof POUserFunc) {
+ setProjectInput(leaf, plans.get(i), i);
+ }
+ }
+ }
+ }
- // Leaf should be either a projection or a UDF
- if (leaf instanceof POProject) {
- ((POProject)leaf).setColumn(i);
- } else if (leaf instanceof POUserFunc) {
- String scope = leaf.getOperatorKey().scope;
- POProject proj = new POProject(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)),
- leaf.getRequestedParallelism(), i);
- proj.setResultType(DataType.BAG);
- // Remove old connections and elements from the plan
- plans.get(i).trimAbove(leaf);
- plans.get(i).add(proj);
- plans.get(i).connect(proj, leaf);
- List<PhysicalOperator> inputs =
- new ArrayList<PhysicalOperator>(1);
- inputs.add(proj);
- leaf.setInputs(inputs);
+ /**
+ * @param op
+ * @param index
+ * @param plan
+ * @throws PlanException
+ */
+ private void setProjectInput(PhysicalOperator op, PhysicalPlan plan, int
index) throws PlanException {
+ String scope = op.getOperatorKey().scope;
+ POProject proj = new POProject(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+ op.getRequestedParallelism(), index);
+ proj.setResultType(DataType.BAG);
+ // Remove old connections and elements from the plan
+ plan.trimAbove(op);
+ plan.add(proj);
+ plan.connect(proj, op);
+ List<PhysicalOperator> inputs =
+ new ArrayList<PhysicalOperator>(1);
+ inputs.add(proj);
+ op.setInputs(inputs);
+
+ }
+
+ /**
+ * @param plan
+ * @param operator
+ * @return
+ */
+ private PhysicalOperator getDistinctUserFunc(PhysicalPlan plan,
PhysicalOperator operator) {
+ if(operator instanceof POUserFunc ) {
+
if(((POUserFunc)operator).getFuncSpec().getClassName().startsWith(DISTINCT_UDF_CLASSNAME))
{
+ return operator;
}
}
-
+ return getDistinctUserFunc(plan,
plan.getPredecessors(operator).get(0));
+
}
+
/**
* @param fe
*/
@@ -520,16 +623,106 @@
private class AlgebraicPlanChecker extends PhyPlanVisitor {
boolean sawNonAlgebraic = false;
+ boolean sawDistinctAgg = false;
+ private boolean sawForeach = false;
AlgebraicPlanChecker(PhysicalPlan plan) {
- super(plan, new DepthFirstWalker<PhysicalOperator,
PhysicalPlan>(plan));
+ super(plan, new DependencyOrderWalker<PhysicalOperator,
PhysicalPlan>(plan));
}
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.PlanVisitor#visit()
+ */
+ @Override
+ public void visit() throws VisitorException {
+ super.visit();
+ // if we saw foreach and distinct agg its ok
+ // else if we only saw foreach, mark it as non algebraic
+ if(sawForeach && !sawDistinctAgg) {
+ sawNonAlgebraic = true;
+ }
+ }
+
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException
{
+ if(sawDistinctAgg) {
+ // we want to combine only in the case where there is only
+ // one PODistinct which is the only input to an agg
+ // we apparently have seen a PODistinct before, so lets not
+ // combine.
+ sawNonAlgebraic = true;
+ }
+ // check that this distinct is the only input to an agg
+ // We could have the following two cases
+ // script 1:
+ // ..
+ // b = group a by ...
+ // c = foreach b { x = distinct a; generate AGG(x), ...}
+ // The above script leads to the following plan for AGG(x):
+ // POUserFunc(org.apache.pig.builtin.COUNT)[long]
+ // |
+ // |---Project[bag][*]
+ // |
+ // |---PODistinct[bag]
+ // |
+ // |---Project[tuple][1]
+
+ // script 2:
+ // ..
+ // b = group a by ...
+ // c = foreach b { x = distinct a; generate AGG(x.$1), ...}
+ // The above script leads to the following plan for AGG(x.$1):
+ // POUserFunc(org.apache.pig.builtin.IntSum)[long]
+ // |
+ // |---Project[bag][1]
+ // |
+ // |---Project[bag][*]
+ // |
+ // |---PODistinct[bag]
+ // |
+ // |---Project[tuple][1]
+ // So tracing from the PODistinct to its successors upto the leaf,
we should
+ // see a Project[bag][*] as the immediate successor and an
optional Project[bag]
+ // as the next successor till we see the leaf.
+ PhysicalOperator leaf = mPlan.getLeaves().get(0);
+ // the leaf has to be a POUserFunc (need not be algebraic)
+ if(leaf instanceof POUserFunc) {
+ List<PhysicalOperator> immediateSuccs =
mPlan.getSuccessors(distinct);
+ if(immediateSuccs.size() == 1 && immediateSuccs.get(0)
instanceof POProject) {
+ if(checkSuccessorIsLeaf(leaf, immediateSuccs.get(0))) { //
script 1 above
+ sawDistinctAgg = true;
+ return;
+ } else { // check for script 2 scenario above
+ List<PhysicalOperator> nextSuccs =
mPlan.getSuccessors(immediateSuccs.get(0));
+ if(nextSuccs.size() == 1) {
+ PhysicalOperator op = nextSuccs.get(0);
+ if(op instanceof POProject) {
+ if(checkSuccessorIsLeaf(leaf, op)) {
+ sawDistinctAgg = true;
+ return;
+ }
+ }
+ }
+
+ }
+ }
+ }
+ // if we did not return above, that means we did not see
+ // the pattern we expected
sawNonAlgebraic = true;
}
-
+
+ private boolean checkSuccessorIsLeaf(PhysicalOperator leaf,
PhysicalOperator opToCheck) {
+ List<PhysicalOperator> succs = mPlan.getSuccessors(opToCheck);
+ if(succs.size() == 1) {
+ PhysicalOperator op = succs.get(0);
+ if(op == leaf) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public void visitFilter(POFilter filter) throws VisitorException {
sawNonAlgebraic = true;
@@ -537,7 +730,11 @@
@Override
public void visitPOForEach(POForEach fe) throws VisitorException {
- sawNonAlgebraic = true;
+ // we need to allow foreach as input for distinct
+ // but don't want it for other things (why?). So lets
+ // flag the presence of Foreach and if this is present
+ // with a distinct agg, it will be allowed.
+ sawForeach = true;
}
@Override
@@ -546,6 +743,84 @@
}
}
+
+ /**
+ * A visitor to replace
+ * Project[bag][*]
+ * |
+ * |---PODistinct[bag]
+ * with
+ * POUserFunc(org.apache.pig.builtin.Distinct)[DataBag]
+ */
+ private class DistinctPatcher extends PhyPlanVisitor {
+
+ public boolean patched = false;
+ /**
+ * @param plan
+ * @param walker
+ */
+ public DistinctPatcher(PhysicalPlan plan,
+ PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
+ super(plan, walker);
+ }
+
+ /**
+ * @param physicalPlan
+ */
+ public DistinctPatcher(PhysicalPlan physicalPlan) {
+ this(physicalPlan, new DependencyOrderWalker<PhysicalOperator,
PhysicalPlan>(physicalPlan));
+ }
+
+ /* (non-Javadoc)
+ * @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+ */
+ @Override
+ public void visitProject(POProject proj) throws VisitorException {
+ // check if this project is preceded by PODistinct and
+ // has the return type bag
+
+
+ List<PhysicalOperator> preds = mPlan.getPredecessors(proj);
+ if(preds == null) return; // this is a leaf project and so not
interesting for patching
+ PhysicalOperator pred = preds.get(0);
+ if(preds.size() == 1 && pred instanceof PODistinct) {
+ if(patched) {
+ // we should not already have been patched since the
+ // Project-Distinct pair should occur only once
+ throw new VisitorException(
+ "Unexpected Project-Distinct pair while trying to
set up plans for use with combiner.");
+ }
+ // we have stick in the
POUserfunc(org.apache.pig.builtin.Distinct)[DataBag]
+ // in place of the Project-PODistinct pair
+ PhysicalOperator distinctPredecessor =
mPlan.getPredecessors(pred).get(0);
+
+ try {
+ String scope = proj.getOperatorKey().scope;
+ List<PhysicalOperator> funcInput = new
ArrayList<PhysicalOperator>();
+ FuncSpec fSpec = new FuncSpec(DISTINCT_UDF_CLASSNAME);
+ funcInput.add(distinctPredecessor);
+ // explicitly set distinctPredecessor's result type to
+ // be tuple - this is relevant when distinctPredecessor is
+ // originally a POForeach with return type BAG - we need to
+ // set it to tuple so we get a stream of tuples.
+ distinctPredecessor.setResultType(DataType.TUPLE);
+ POUserFunc func = new POUserFunc(new OperatorKey(scope,
+
NodeIdGenerator.getGenerator().getNextNodeId(scope)),-1, funcInput, fSpec);
+ func.setResultType(DataType.BAG);
+ mPlan.replace(proj, func);
+ mPlan.remove(pred);
+ // connect the the newly add "func" to
+ // the predecessor to the earlier PODistinct
+ mPlan.connect(distinctPredecessor, func);
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ patched = true;
+ }
+ }
+
+ }
+
// Reset any member variables since we may have already visited one
// combine.
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
Mon Jan 5 15:57:28 2009
@@ -52,19 +52,19 @@
mStream.println("MapReduce node " + mr.getOperatorKey().toString());
if (mr.mapPlan != null && mr.mapPlan.size() > 0) {
mStream.println("Map Plan");
- PlanPrinter printer = new PlanPrinter(mr.mapPlan);
+ PlanPrinter printer = new PlanPrinter(mr.mapPlan, mStream);
printer.visit();
mStream.println("--------");
}
if (mr.combinePlan != null && mr.combinePlan.size() > 0) {
mStream.println("Combine Plan");
- PlanPrinter printer = new PlanPrinter(mr.combinePlan);
+ PlanPrinter printer = new PlanPrinter(mr.combinePlan, mStream);
printer.visit();
mStream.println("--------");
}
if (mr.reducePlan != null && mr.reducePlan.size() > 0) {
mStream.println("Reduce Plan");
- PlanPrinter printer = new PlanPrinter(mr.reducePlan);
+ PlanPrinter printer = new PlanPrinter(mr.reducePlan, mStream);
printer.visit();
mStream.println("--------");
}
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
Mon Jan 5 15:57:28 2009
@@ -118,6 +118,29 @@
}
super.remove(op);
}
+
+ /* (non-Javadoc)
+ * @see
org.apache.pig.impl.plan.OperatorPlan#replace(org.apache.pig.impl.plan.Operator,
org.apache.pig.impl.plan.Operator)
+ */
+ @Override
+ public void replace(PhysicalOperator oldNode, PhysicalOperator newNode)
+ throws PlanException {
+ List<PhysicalOperator> oldNodeSuccessors = getSuccessors(oldNode);
+ super.replace(oldNode, newNode);
+ if(oldNodeSuccessors != null) {
+ for (PhysicalOperator preds : oldNodeSuccessors) {
+ List<PhysicalOperator> inputs = preds.getInputs();
+ // now replace oldNode with newNode in
+ // the input list of oldNode's successors
+ for(int i = 0; i < inputs.size(); i++) {
+ if(inputs.get(i) == oldNode) {
+ inputs.set(i, newNode);
+ }
+ }
+ }
+ }
+
+ }
/* (non-Javadoc)
* @see
org.apache.pig.impl.plan.OperatorPlan#add(org.apache.pig.impl.plan.Operator)
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
Mon Jan 5 15:57:28 2009
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -48,15 +49,22 @@
int levelCntr = -1;
OutputStream printer;
+
+ PrintStream stream = System.out;
public PlanPrinter(P plan) {
super(plan, new DepthFirstWalker<O, P>(plan));
}
+
+ public PlanPrinter(P plan, PrintStream stream) {
+ super(plan, new DepthFirstWalker<O, P>(plan));
+ this.stream = stream;
+ }
@Override
public void visit() throws VisitorException {
try {
- System.out.write(depthFirstPP().getBytes());
+ stream.write(depthFirstPP().getBytes());
} catch (IOException e) {
throw new VisitorException(e.getMessage());
}
@@ -89,7 +97,7 @@
}
}
if (newPredecessors.size() > 0) {
- System.out.println();
+ stream.println();
breadthFirst(newPredecessors, seen);
}
}
@@ -182,35 +190,35 @@
private void dispTabs() {
for (int i = 0; i < levelCntr; i++)
- System.out.print(TAB1);
+ stream.print(TAB1);
}
public void visitLoad(POLoad op) {
- System.out.print(op.name() + " ");
+ stream.print(op.name() + " ");
}
public void visitStore(POStore op) {
- System.out.print(op.name() + " ");
+ stream.print(op.name() + " ");
}
public void visitFilter(POFilter op) {
- System.out.print(op.name() + " ");
+ stream.print(op.name() + " ");
}
public void visitLocalRearrange(POLocalRearrange op) {
- System.out.print(op.name() + " ");
+ stream.print(op.name() + " ");
}
public void visitGlobalRearrange(POGlobalRearrange op) {
- System.out.print(op.name() + " ");
+ stream.print(op.name() + " ");
}
public void visitPackage(POPackage op) {
- System.out.print(op.name() + " ");
+ stream.print(op.name() + " ");
}
public void visitStartMap(POUnion op) {
- System.out.print(op.name() + " ");
+ stream.print(op.name() + " ");
}
}
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
Mon Jan 5 15:57:28 2009
@@ -32,6 +32,7 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -42,7 +43,7 @@
*
*
*/
-public class PODistinct extends PhysicalOperator {
+public class PODistinct extends PhysicalOperator implements Cloneable {
private boolean inputsAccumulated = false;
private DataBag distinctBag = null;
@@ -121,5 +122,13 @@
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitDistinct(this);
}
+ /* (non-Javadoc)
+ * @see
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#clone()
+ */
+ @Override
+ public PODistinct clone() throws CloneNotSupportedException {
+ // TODO Auto-generated method stub
+ return new PODistinct(new OperatorKey(this.mKey.scope,
NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
this.requestedParallelism, this.inputs);
+ }
}
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
Mon Jan 5 15:57:28 2009
@@ -497,9 +497,11 @@
flattens.add(b);
}
}
- return new POForEach(new OperatorKey(mKey.scope,
- NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
- requestedParallelism, plans, flattens);
+ POForEach clone = new POForEach(new OperatorKey(mKey.scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+ requestedParallelism, plans, flattens);
+ clone.setResultType(getResultType());
+ return clone;
}
public boolean inProcessing()
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
(original)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
Mon Jan 5 15:57:28 2009
@@ -186,7 +186,7 @@
public void explain(PhysicalPlan plan, PrintStream stream) {
try {
- PlanPrinter printer = new PlanPrinter(plan);
+ PlanPrinter printer = new PlanPrinter(plan, stream);
printer.visit();
stream.println();
Added: hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java?rev=731777&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/Distinct.java Mon Jan
5 15:57:28 2009
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.WrappedIOException;
+
+/**
+ * Find the distinct set of tuples in a bag.
+ * This is a blocking operator. All the input is put in the hashset implemented
+ * in DistinctDataBag which also provides the other DataBag interfaces.
+ *
+ *
+ */
+public class Distinct extends EvalFunc<DataBag> implements Algebraic {
+
+ private static BagFactory bagFactory = BagFactory.getInstance();
+ private static TupleFactory tupleFactory = TupleFactory.getInstance();
+ /* (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ return getDistinct(input);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.Algebraic#getFinal()
+ */
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.Algebraic#getInitial()
+ */
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.Algebraic#getIntermed()
+ */
+ @Override
+ public String getIntermed() {
+ return Intermediate.class.getName();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ // the input has a single field which is a tuple
+ // representing the data we want to distinct.
+ // unwrap, put in a bag and send down
+ try {
+ DataBag bag = bagFactory.newDefaultBag();
+ bag.add((Tuple)input.get(0));
+ return tupleFactory.newTuple(bag);
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap(e);
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ return tupleFactory.newTuple(getDistinctFromNestedBags(input));
+ }
+ }
+
+ static public class Final extends EvalFunc<DataBag> {
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ return getDistinctFromNestedBags(input);
+ }
+ }
+
+ static private DataBag getDistinctFromNestedBags(Tuple input) throws
IOException {
+ DataBag result = bagFactory.newDistinctBag();
+ try {
+ DataBag bg = (DataBag)input.get(0);
+ for (Tuple tuple : bg) {
+ // Each tuple has a single column
+ // which is a bag. Get tuples out of it
+ // and distinct over all tuples
+ for (Tuple t : (DataBag)tuple.get(0)) {
+ result.add(t);
+ }
+ }
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap(e);
+ }
+ return result;
+ }
+
+ static protected DataBag getDistinct(Tuple input) throws IOException {
+ try {
+ DataBag inputBg = (DataBag)input.get(0);
+ DataBag result = bagFactory.newDistinctBag();
+ for (Tuple tuple : inputBg) {
+ result.add(tuple);
+ }
+ return result;
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap(e);
+ }
+ }
+
+}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/MiniCluster.java Mon Jan
5 15:57:28 2009
@@ -105,4 +105,8 @@
public Properties getProperties() {
return ConfigurationUtil.toProperties(m_conf);
}
+
+ public FileSystem getFileSystem() {
+ return m_fileSys;
+ }
}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java Mon Jan
5 15:57:28 2009
@@ -908,6 +908,40 @@
}
+
+ @Test
+ public void testDistinct() throws Exception {
+
+ Integer[] inp = new Integer[] { 1, 2 , 3, 1 ,4, 5, 3};
+ DataBag inputBag = Util.createBagOfOneColumn(inp);
+ EvalFunc<Tuple> initial = new Distinct.Initial();
+ DataBag intermedInputBg1 = bagFactory.newDefaultBag();
+ DataBag intermedInputBg2 = bagFactory.newDefaultBag();
+ int i = 0;
+ for (Tuple t : inputBag) {
+ Tuple initialOutput = initial.exec(tupleFactory.newTuple(t));
+ if(i < inp.length/2 ) {
+ intermedInputBg1.add(initialOutput);
+ } else {
+ intermedInputBg2.add(initialOutput);
+ }
+ i++;
+ }
+
+ EvalFunc<Tuple> intermed = new Distinct.Intermediate();
+
+ DataBag finalInputBg = bagFactory.newDefaultBag();
+
finalInputBg.add(intermed.exec(tupleFactory.newTuple(intermedInputBg1)));
+
finalInputBg.add(intermed.exec(tupleFactory.newTuple(intermedInputBg2)));
+ EvalFunc<DataBag> fin = new Distinct.Final();
+ DataBag result = fin.exec(tupleFactory.newTuple(finalInputBg));
+
+ Integer[] exp = new Integer[] { 1, 2, 3, 4, 5};
+ DataBag expectedBag = Util.createBagOfOneColumn(exp);
+ assertEquals(expectedBag, result);
+
+ }
+
@Test
public void testCONCAT() throws Exception {
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
(original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java Mon
Jan 5 15:57:28 2009
@@ -17,11 +17,13 @@
*/
package org.apache.pig.test;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -105,13 +107,20 @@
input[i] = Integer.toString(0);
}
}
- File f1 = Util.createFile(input);
+ Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
Properties props = cluster.getProperties();
props.setProperty("io.sort.mb", "1");
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, props);
- pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString())
+ "' as (x:int);");
+ pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as
(x:int);");
pigServer.registerQuery("b = group a all;");
pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0),
MIN(a.$0), MAX(a.$0), AVG(a.$0);");
+
+ // make sure there is a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+
Iterator<Tuple> it = pigServer.openIterator("c");
Tuple t = it.next();
assertEquals(512000L, t.get(0));
@@ -120,6 +129,156 @@
assertEquals(1, t.get(3));
assertEquals(0.5, t.get(4));
assertFalse(it.hasNext());
+ Util.deleteFile(cluster, "MultiCombinerUseInput.txt");
+ }
+
+ @Test
+ public void testDistinctAggs1() throws Exception {
+ // test the use of combiner for distinct aggs:
+ String input[] = {
+ "pig1\t18\t2.1",
+ "pig2\t24\t3.3",
+ "pig5\t45\t2.4",
+ "pig1\t18\t2.1",
+ "pig1\t19\t2.1",
+ "pig2\t24\t4.5",
+ "pig1\t20\t3.1" };
+
+ Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as
(name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("b = group a by name;");
+ pigServer.registerQuery("c = foreach b {" +
+ " x = distinct a.age;" +
+ " y = distinct a.gpa;" +
+ " z = distinct a;" +
+ " generate group, COUNT(x), SUM(x.age),
SUM(y.gpa), SUM(a.age), " +
+ " SUM(a.gpa), COUNT(z.age),
COUNT(z), SUM(z.age);};");
+
+ // make sure there is a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+
+ HashMap<String, Object[]> results = new HashMap<String, Object[]>();
+ results.put("pig1", new Object[]
{"pig1",3L,57L,5.2,75L,9.4,3L,3L,57L});
+ results.put("pig2", new Object[]
{"pig2",1L,24L,7.8,48L,7.8,2L,2L,48L});
+ results.put("pig5", new Object[]
{"pig5",1L,45L,2.4,45L,2.4,1L,1L,45L});
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ List<Object> fields = t.getAll();
+ Object[] expected = results.get((String)fields.get(0));
+ int i = 0;
+ for (Object field : fields) {
+ assertEquals(expected[i++], field);
+ }
+ }
+ Util.deleteFile(cluster, "distinctAggs1Input.txt");
+
+ }
+
+ @Test
+ public void testDistinctNoCombiner() throws Exception {
+ // test that combiner is NOT invoked when
+ // one of the elements in the foreach generate
+ // is a distinct() as the leaf
+ String input[] = {
+ "pig1\t18\t2.1",
+ "pig2\t24\t3.3",
+ "pig5\t45\t2.4",
+ "pig1\t18\t2.1",
+ "pig1\t19\t2.1",
+ "pig2\t24\t4.5",
+ "pig1\t20\t3.1" };
+
+ Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as
(name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("b = group a by name;");
+ pigServer.registerQuery("c = foreach b {" +
+ " z = distinct a;" +
+ " generate group, z, SUM(a.age), SUM(a.gpa);};");
+
+ // make sure there is a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+
+ HashMap<String, Object[]> results = new HashMap<String, Object[]>();
+ results.put("pig1", new Object[] {"pig1","bag-place-holder",75L,9.4});
+ results.put("pig2", new Object[] {"pig2","bag-place-holder",48L,7.8});
+ results.put("pig5", new Object[] {"pig5","bag-place-holder",45L,2.4});
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ List<Object> fields = t.getAll();
+ Object[] expected = results.get((String)fields.get(0));
+ int i = 0;
+ for (Object field : fields) {
+ if(i == 1) {
+ // ignore the second field which is a bag
+ // for comparison here
+ continue;
+ }
+ assertEquals(expected[i++], field);
+ }
+ }
+ Util.deleteFile(cluster, "distinctNoCombinerInput.txt");
+
+ }
+
+ @Test
+ public void testForEachNoCombiner() throws Exception {
+ // test that combiner is NOT invoked when
+ // one of the elements in the foreach generate
+ // has a foreach in the plan without a distinct agg
+ String input[] = {
+ "pig1\t18\t2.1",
+ "pig2\t24\t3.3",
+ "pig5\t45\t2.4",
+ "pig1\t18\t2.1",
+ "pig1\t19\t2.1",
+ "pig2\t24\t4.5",
+ "pig1\t20\t3.1" };
+
+ Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as
(name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("b = group a by name;");
+ pigServer.registerQuery("c = foreach b {" +
+ " z = a.age;" +
+ " generate group, z, SUM(a.age), SUM(a.gpa);};");
+
+ // make sure there is a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+
+ HashMap<String, Object[]> results = new HashMap<String, Object[]>();
+ results.put("pig1", new Object[] {"pig1","bag-place-holder",75L,9.4});
+ results.put("pig2", new Object[] {"pig2","bag-place-holder",48L,7.8});
+ results.put("pig5", new Object[] {"pig5","bag-place-holder",45L,2.4});
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ List<Object> fields = t.getAll();
+ Object[] expected = results.get((String)fields.get(0));
+ int i = 0;
+ for (Object field : fields) {
+ if(i == 1) {
+ // ignore the second field which is a bag
+ // for comparison here
+ continue;
+ }
+ assertEquals(expected[i++], field);
+ }
+ }
+ Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+
}
}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/Util.java?rev=731777&r1=731776&r2=731777&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/Util.java Mon Jan 5
15:57:28 2009
@@ -29,6 +29,9 @@
import static java.util.regex.Matcher.quoteReplacement;
import junit.framework.Assert;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.*;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -118,6 +121,16 @@
return b;
}
+ static public<T> DataBag createBagOfOneColumn(T[] input) throws
ExecException {
+ DataBag result = mBagFactory.newDefaultBag();
+ for (int i = 0; i < input.length; i++) {
+ Tuple t = mTupleFactory.newTuple(1);
+ t.set(0, input[i]);
+ result.add(t);
+ }
+ return result;
+ }
+
static public Map<Object, Object> createMap(String[] contents)
{
Map<Object, Object> m = new HashMap<Object, Object>();
@@ -179,6 +192,44 @@
pw.close();
return f;
}
+
+ /**
+ * Helper to create a dfs file on the Minicluster DFS with given
+ * input data for use in test cases.
+ *
+ * @param miniCluster reference to the Minicluster where the file should
be created
+ * @param fileName pathname of the file to be created
+ * @param inputData input for test cases, each string in inputData[] is
written
+ * on one line
+ * @throws IOException
+ */
+ static public void createInputFile(MiniCluster miniCluster, String
fileName,
+ String[] inputData)
+ throws IOException {
+ FileSystem fs = miniCluster.getFileSystem();
+ if(fs.exists(new Path(fileName))) {
+ throw new IOException("File " + fileName + " already exists on the
minicluster");
+ }
+ FSDataOutputStream stream = fs.create(new Path(fileName));
+ PrintWriter pw = new PrintWriter(new OutputStreamWriter(stream,
"UTF-8"));
+ for (int i=0; i<inputData.length; i++){
+ pw.println(inputData[i]);
+ }
+ pw.close();
+ }
+
+ /**
+ * Helper to remove a dfs file from the minicluster DFS
+ *
+ * @param miniCluster reference to the Minicluster where the file should
be deleted
+ * @param fileName pathname of the file to be deleted
+ * @throws IOException
+ */
+ static public void deleteFile(MiniCluster miniCluster, String fileName)
+ throws IOException {
+ FileSystem fs = miniCluster.getFileSystem();
+ fs.delete(new Path(fileName), true);
+ }
/**
* Helper function to check if the result of a Pig Query is in line
with