Author: olga Date: Fri Sep 26 14:20:31 2008 New Revision: 699504 URL: http://svn.apache.org/viewvc?rev=699504&view=rev Log: PIG-443: illustrate
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/PigServer.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Fri Sep 26 14:20:31 2008 @@ -253,3 +253,5 @@ PIG-462: LIMIT N should create one output file with N rows (shravanmn via olgan) + + PIG-443: Illustrate for the Types branch (shubham via olgan) Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Sep 26 14:20:31 2008 @@ -41,6 +41,7 @@ import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; import org.apache.pig.builtin.BinStorage; import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; @@ -65,6 +66,7 @@ import org.apache.pig.impl.util.PropertiesUtil; import org.apache.pig.impl.logicalLayer.LODefine; import org.apache.pig.impl.logicalLayer.LOStore; +import org.apache.pig.pen.ExampleGenerator; /** * @@ -610,6 +612,21 @@ // pigContext.getExecutionEngine().reclaimScope(this.scope); } + public Map<LogicalOperator, DataBag> getExamples(String alias) { + //LogicalPlan plan = aliases.get(aliasOp.get(alias)); + LogicalPlan plan = null; + try { + plan = clonePlan(alias); + } catch (IOException e) { + //Since the original script is parsed anyway, there should not be an + //error in this parsing. The only reason there can be an error is when + //the files being loaded in load don't exist anymore. + e.printStackTrace(); + } + ExampleGenerator exgen = new ExampleGenerator(plan, pigContext); + return exgen.getExamples(); + } + private ExecJob execute(String alias) throws FrontendException, ExecException { ExecJob job = null; // lp.explain(System.out, System.err); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Sep 26 14:20:31 2008 @@ -59,15 +59,15 @@ public class LogToPhyTranslationVisitor extends LOVisitor { - Map<LogicalOperator, PhysicalOperator> LogToPhyMap; + protected Map<LogicalOperator, PhysicalOperator> LogToPhyMap; Random r = new Random(); - Stack<PhysicalPlan> currentPlans; + protected Stack<PhysicalPlan> currentPlans; - PhysicalPlan currentPlan; + protected PhysicalPlan currentPlan; - NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator(); + protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator(); private Log log = LogFactory.getLog(getClass()); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Sep 26 14:20:31 2008 @@ -34,6 +34,7 @@ import org.apache.pig.impl.plan.Operator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.LineageTracer; /** * @@ -114,6 +115,8 @@ static protected DataBag dummyBag; static protected Map dummyMap; + + protected LineageTracer lineageTracer; public PhysicalOperator(OperatorKey k) { this(k, -1, null); @@ -134,6 +137,10 @@ res = new Result(); } + public void setLineageTracer(LineageTracer lineage) { + this.lineageTracer = lineage; + } + public int getRequestedParallelism() { return requestedParallelism; } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri Sep 26 14:20:31 2008 @@ -22,6 +22,8 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.*; import org.apache.pig.impl.plan.PlanVisitor; import org.apache.pig.impl.plan.PlanWalker; import org.apache.pig.impl.plan.VisitorException; @@ -219,4 +221,15 @@ } + public void visitCogroup(POCogroup cogroup) { + // TODO Auto-generated method stub + + } + + public void visitSplit(org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit split) { + // TODO Auto-generated method stub + + } + + } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Fri Sep 26 14:20:31 2008 @@ -30,6 +30,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; /** * This is an implementation of the Filter operator. It has an Expression Plan @@ -150,6 +151,11 @@ return res; if (res.result != null && (Boolean) res.result == true) { + if(lineageTracer != null) { + ExampleTuple tIn = (ExampleTuple) inp.result; + lineageTracer.insert(tIn); + lineageTracer.union(tIn, tIn); + } return inp; } } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Sep 26 14:20:31 2008 @@ -23,6 +23,7 @@ import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; public class POForEach extends PhysicalOperator { @@ -49,6 +50,8 @@ //This is the template whcih contains tuples and is flattened out in CreateTuple() to generate the final output Object[] data = null; + ExampleTuple tIn = null; + public POForEach(OperatorKey k) { this(k,-1,null,null); } @@ -122,10 +125,17 @@ if(processingPlan){ while(true) { res = processPlan(); - if(res.returnStatus==POStatus.STATUS_OK){ + if(res.returnStatus==POStatus.STATUS_OK) { + if(lineageTracer != null && res.result != null) { + ExampleTuple tOut = new ExampleTuple((Tuple) res.result); + tOut.synthetic = tIn.synthetic; + lineageTracer.insert(tOut); + lineageTracer.union(tOut, tIn); + res.result = tOut; + } return res; } - if(res.returnStatus==POStatus.STATUS_EOP){ + if(res.returnStatus==POStatus.STATUS_EOP) { processingPlan = false; break; } @@ -156,6 +166,16 @@ res = processPlan(); processingPlan = true; + + if(lineageTracer != null && res.result != null) { + //we check for res.result since that can also be null in the case of flatten + tIn = (ExampleTuple) inp.result; + ExampleTuple tOut = new ExampleTuple((Tuple) res.result); + tOut.synthetic = tIn.synthetic; + lineageTracer.insert(tOut); + lineageTracer.union(tOut, tIn); + res.result = tOut; + } return res; } @@ -323,6 +343,11 @@ } else out.append(in); } + + if(lineageTracer != null) { + ExampleTuple tOut = new ExampleTuple(); + tOut.reference(out); + } return out; } Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Fri Sep 26 14:20:31 2008 @@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; /** * The load operator which is used in two ways: @@ -135,6 +136,10 @@ } else res.returnStatus = POStatus.STATUS_OK; + if(lineageTracer != null) { + ExampleTuple tOut = new ExampleTuple((Tuple) res.result); + res.result = tOut; + } } catch (IOException e) { log.error("Received error from loader function: " + e); return res; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Sep 26 14:20:31 2008 @@ -272,6 +272,10 @@ } if (it.hasNext()) { res.result = it.next(); + if(lineageTracer != null) { + lineageTracer.insert((Tuple) res.result); + lineageTracer.union((Tuple)res.result, (Tuple)res.result); + } res.returnStatus = POStatus.STATUS_OK; } else { res.returnStatus = POStatus.STATUS_EOP; Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Fri Sep 26 14:20:31 2008 @@ -29,6 +29,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.ExampleTuple; /** * The union operator that combines the two inputs into a single @@ -157,6 +158,11 @@ res.returnStatus = POStatus.STATUS_OK; detachInput(); nextReturnEOP = true ; + if(lineageTracer != null) { + ExampleTuple tOut = (ExampleTuple) res.result; + lineageTracer.insert(tOut); + lineageTracer.union(tOut, tOut); + } return res; } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Fri Sep 26 14:20:31 2008 @@ -17,6 +17,7 @@ */ package org.apache.pig.impl.logicalLayer; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; @@ -55,4 +56,23 @@ lpp.print(out); } + +// public String toString() { +// if(mOps.size() == 0) +// return "Empty Plan!"; +// else{ +// ByteArrayOutputStream baos = new ByteArrayOutputStream(); +// PrintStream ps = new PrintStream(baos); +// try { +// explain(baos, ps); +// } catch (VisitorException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } catch (IOException e) { +// // TODO Auto-generated catch block +// e.printStackTrace(); +// } +// return baos.toString(); +// } +// } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java Fri Sep 26 14:20:31 2008 @@ -73,7 +73,7 @@ return new DependencyOrderWalker<O, P>(plan); } - private void doAllPredecessors(O node, + protected void doAllPredecessors(O node, Set<O> seen, Collection<O> fifo) throws VisitorException { if (!seen.contains(node)) { Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java Fri Sep 26 14:20:31 2008 @@ -19,6 +19,7 @@ package org.apache.pig.impl.util; import java.util.*; +import java.util.Map.Entry; public class IdentityHashSet<E> implements Set<E> { @@ -121,6 +122,23 @@ throw new UnsupportedOperationException("Unsupported operation on IdentityHashSet."); } - + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("{"); + + Iterator<Entry<E, Object>> i = map.entrySet().iterator(); + boolean hasNext = i.hasNext(); + while (hasNext) { + Entry<E, Object> e = i.next(); + E key = e.getKey(); + buf.append(key); + hasNext = i.hasNext(); + if (hasNext) + buf.append(", "); + } + + buf.append("}"); + return buf.toString(); + } } Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Fri Sep 26 14:20:31 2008 @@ -273,6 +273,11 @@ System.out.println(t); } } + + protected void processIllustrate(String alias) throws IOException + { + mPigServer.getExamples(alias); + } protected void processKill(String jobid) throws IOException { Modified: incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=699504&r1=699503&r2=699504&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original) +++ incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Sep 26 14:20:31 2008 @@ -95,6 +95,8 @@ abstract protected void processPig(String cmd) throws IOException; abstract protected void processRemove(String path) throws IOException; + + abstract protected void processIllustrate(String alias) throws IOException; static String unquote(String s) { @@ -137,6 +139,7 @@ TOKEN: {<REGISTER: "register">} TOKEN: {<REMOVE: "rm">} TOKEN: {<SET: "set">} +TOKEN: {<ILLUSTRATE: "illustrate">} // internal use commands TOKEN: {<SCRIPT_DONE: "scriptDone">} @@ -379,6 +382,10 @@ t1 = <IDENTIFIER> {processDump(t1.image);} | + <ILLUSTRATE> + t1 = <IDENTIFIER> + {processIllustrate(t1.image);} + | <DESCRIBE> t1 = <IDENTIFIER> {processDescribe(t1.image);} Added: incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java?rev=699504&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java Fri Sep 26 14:20:31 2008 @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataBag; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.junit.Before; +import org.junit.Test; + +public class TestExampleGenerator extends TestCase { + + MiniCluster cluster = MiniCluster.buildCluster(); + PigContext pigContext = new PigContext(ExecType.MAPREDUCE, cluster + .getProperties()); + + Random rand = new Random(); + int MAX = 100; + String A, B; + + { + try { + pigContext.connect(); + } catch (ExecException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Before + public void setUp() throws Exception { + File fileA, fileB; + + fileA = File.createTempFile("dataA", ".dat"); + fileB = File.createTempFile("dataB", ".dat"); + + writeData(fileA); + writeData(fileB); + + A = "'" + FileLocalizer.hadoopify(fileA.toString(), pigContext) + "'"; + B = "'" + FileLocalizer.hadoopify(fileB.toString(), pigContext) + "'"; + System.out.println("A : " + A + "\n" + "B : " + B); + System.out.println("Test data created."); + + } + + private void writeData(File dataFile) throws Exception { + // File dataFile = File.createTempFile(name, ".dat"); + FileOutputStream dat = new FileOutputStream(dataFile); + + Random rand = new Random(); + + for (int i = 0; i < MAX; i++) + dat.write((rand.nextInt(10) + "\t" + rand.nextInt(10) + "\n") + .getBytes()); + + dat.close(); + } + + @Test + public void testFilter() throws Exception { + + PigServer pigserver = new PigServer(pigContext); + + String query = "A = load " + A + + " using PigStorage() as (x : int, y : int);\n"; + pigserver.registerQuery(query); + query = "B = filter A by x > 10;"; + pigserver.registerQuery(query); + Map<LogicalOperator, DataBag> derivedData = pigserver.getExamples("B"); + + assertTrue(derivedData != null); + + } + + @Test + public void testForeach() throws ExecException, IOException { + PigServer pigServer = new PigServer(pigContext); + + pigServer.registerQuery("A = load " + A + + " using PigStorage() as (x : int, y : int);"); + pigServer.registerQuery("B = foreach A generate x + y as sum;"); + + Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("B"); + + assertTrue(derivedData != null); + } + + @Test + public void testJoin() throws IOException, ExecException { + PigServer pigServer = new PigServer(pigContext); + pigServer.registerQuery("A1 = load " + A + " as (x, y);"); + pigServer.registerQuery("B1 = load " + B + " as (x, y);"); + + pigServer.registerQuery("E = join A1 by x, B1 by x;"); + + Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("E"); + + assertTrue(derivedData != null); + } + + @Test + public void testCogroupMultipleCols() throws Exception { + + PigServer pigServer = new PigServer(pigContext); + pigServer.registerQuery("A = load " + A + " as (x, y);"); + pigServer.registerQuery("B = load " + B + " as (x, y);"); + pigServer.registerQuery("C = cogroup A by (x, y), B by (x, y);"); + Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C"); + + assertTrue(derivedData != null); + } + + @Test + public void testCogroup() throws Exception { + PigServer pigServer = new PigServer(pigContext); + pigServer.registerQuery("A = load " + A + " as (x, y);"); + pigServer.registerQuery("B = load " + B + " as (x, y);"); + pigServer.registerQuery("C = cogroup A by x, B by x;"); + Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C"); + + assertTrue(derivedData != null); + } + + @Test + public void testGroup() throws Exception { + PigServer pigServer = new PigServer(pigContext); + pigServer.registerQuery("A = load " + A.toString() + " as (x, y);"); + pigServer.registerQuery("B = group A by x;"); + Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("B"); + + assertTrue(derivedData != null); + + } + + @Test + public void testUnion() throws Exception { + PigServer pigServer = new PigServer(pigContext); + pigServer.registerQuery("A = load " + A.toString() + " as (x, y);"); + pigServer.registerQuery("B = load " + B.toString() + " as (x, y);"); + pigServer.registerQuery("C = union A, B;"); + Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C"); + + assertTrue(derivedData != null); + } + +} Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java?rev=699504&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java Fri Sep 26 14:20:31 2008 @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.ExecType; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.LODefine; +import org.apache.pig.impl.logicalLayer.LOLoad; +import org.apache.pig.impl.logicalLayer.LogicalOperator; +import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; +import org.apache.pig.impl.logicalLayer.PlanSetter; +import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor; +import org.apache.pig.impl.plan.CompilationMessageCollector; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.pen.util.FunctionalLogicalOptimizer; +import org.junit.Test; + +public class TestLocalPOSplit extends TestCase { + + Random r = new Random(); + + Log log = LogFactory.getLog(getClass()); + + PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties()); + + @Test + public void testSplit() throws IOException, VisitorException, ExecException { + pigContext.connect(); + File datFile = File.createTempFile("tempA", ".dat"); + + FileOutputStream dat = new FileOutputStream(datFile); + + for (int i = 0; i < 100; i++) { + String str = r.nextInt(10) + "\n"; + dat.write(str.getBytes()); + + } + + dat.close(); + + String query = "split (load '" + datFile.getAbsolutePath() + + "') into a if $0 == 2, b if $0 == 9, c if $0 == 7 ;"; + + LogicalPlan plan = buildPlan(query); + PhysicalPlan pp = buildPhysicalPlan(plan); + + DataBag[] bag = new DataBag[pp.getLeaves().size()]; + + for (int i = 0; i < bag.length; i++) { + bag[i] = BagFactory.getInstance().newDefaultBag(); + } + + for (int i = 0; i < pp.getLeaves().size(); i++) { + Tuple t = null; + for (Result res = pp.getLeaves().get(i).getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = pp + .getLeaves().get(i).getNext(t)) { + if (res.returnStatus == POStatus.STATUS_OK) + bag[i].add((Tuple) res.result); + } + } + + // Depending on how the "maps" in the physical plan are + // built the leaves could be in different order between different runs. + // lets test the first tuple out of each leaf to + // 1) ensure the value was not seen before + // 2) all the remaining tuples from that leaf are same + // as the first value + Map<DataByteArray, Boolean> seen = new HashMap<DataByteArray, Boolean>(); + seen.put(new DataByteArray("7".getBytes()), false); + seen.put(new DataByteArray("9".getBytes()), false); + seen.put(new DataByteArray("2".getBytes()), false); + + for (int i = 0; i < bag.length; i++) { + DataByteArray firstValue = null; + Iterator<Tuple> it = bag[i].iterator(); + if (it.hasNext()) { + // check that we have not seen this value before + Tuple t = it.next(); + System.out.println(t); + firstValue = (DataByteArray) t.get(0); + assertFalse((Boolean) seen.get(firstValue)); + seen.put(firstValue, true); + + } + // check that all remaining tuples from this + // leaf have the same values as the first value + for (; it.hasNext();) { + Tuple t = it.next(); + System.out.println(t); + assertEquals(t.get(0), firstValue); + } + } + } + + public PhysicalPlan buildPhysicalPlan(LogicalPlan lp) + throws VisitorException { + LocalLogToPhyTranslationVisitor visitor = new LocalLogToPhyTranslationVisitor( + lp); + visitor.setPigContext(pigContext); + visitor.visit(); + return visitor.getPhysicalPlan(); + } + + public LogicalPlan buildPlan(String query) { + return buildPlan(query, LogicalPlanBuilder.class.getClassLoader()); + } + + public LogicalPlan buildPlan(String query, ClassLoader cldr) { + LogicalPlanBuilder.classloader = cldr; + + LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); // + + try { + LogicalPlan lp = builder.parse("Test-Plan-Builder", query, aliases, + logicalOpTable, aliasOp); + List<LogicalOperator> roots = lp.getRoots(); + + if (roots.size() > 0) { + for (LogicalOperator op : roots) { + if (!(op instanceof LOLoad) && !(op instanceof LODefine)) { + throw new Exception( + "Cannot have a root that is not the load or define operator. Found " + + op.getClass().getName()); + } + } + } + + System.err.println("Query: " + query); + + // Just the top level roots and their children + // Need a recursive one to travel down the tree + + for (LogicalOperator op : lp.getRoots()) { + System.err.println("Logical Plan Root: " + + op.getClass().getName() + " object " + op); + + List<LogicalOperator> listOp = lp.getSuccessors(op); + + if (null != listOp) { + Iterator<LogicalOperator> iter = listOp.iterator(); + while (iter.hasNext()) { + LogicalOperator lop = iter.next(); + System.err.println("Successor: " + + lop.getClass().getName() + " object " + lop); + } + } + } + lp = refineLogicalPlan(lp); + assertTrue(lp != null); + return lp; + } catch (IOException e) { + // log.error(e); + // System.err.println("IOException Stack trace for query: " + + // query); + // e.printStackTrace(); + fail("IOException: " + e.getMessage()); + } catch (Exception e) { + log.error(e); + // System.err.println("Exception Stack trace for query: " + query); + // e.printStackTrace(); + fail(e.getClass().getName() + ": " + e.getMessage() + " -- " + + query); + } + return null; + } + + private LogicalPlan refineLogicalPlan(LogicalPlan plan) { + PlanSetter ps = new PlanSetter(plan); + try { + ps.visit(); + + } catch (VisitorException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + // run through validator + CompilationMessageCollector collector = new CompilationMessageCollector(); + FrontendException caught = null; + try { + LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor( + plan, pigContext); + validator.validate(plan, collector); + + FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer( + plan); + optimizer.optimize(); + } catch (FrontendException fe) { + // Need to go through and see what the collector has in it. But + // remember what we've caught so we can wrap it into what we + // throw. + caught = fe; + } + + return plan; + + } + + Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>(); + Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>(); + Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); +} Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java?rev=699504&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java Fri Sep 26 14:20:31 2008 @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead; +import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.plan.OperatorKey; + +public class TestPOCogroup extends TestCase { + Random r = new Random(); + + public void testCogroup2Inputs() throws Exception { + DataBag bag1 = BagFactory.getInstance().newDefaultBag(); + Tuple t = TupleFactory.getInstance().newTuple(); + t.append(new Integer(2)); + bag1.add(t); + t = TupleFactory.getInstance().newTuple(); + t.append(new Integer(1)); + bag1.add(t); + t = TupleFactory.getInstance().newTuple(); + t.append(new Integer(1)); + bag1.add(t); + + DataBag bag2 = BagFactory.getInstance().newDefaultBag(); + t = TupleFactory.getInstance().newTuple(); + t.append(new Integer(2)); + bag2.add(t); + t = TupleFactory.getInstance().newTuple(); + t.append(new Integer(2)); + bag2.add(t); + t = TupleFactory.getInstance().newTuple(); + t.append(new Integer(1)); + bag2.add(t); + + PORead poread1 = new PORead(new OperatorKey("", r.nextLong()), bag1); + PORead poread2 = new PORead(new OperatorKey("", r.nextLong()), bag2); + + List<PhysicalOperator> inputs1 = new LinkedList<PhysicalOperator>(); + inputs1.add(poread1); + + POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + prj1.setResultType(DataType.INTEGER); + PhysicalPlan p1 = new PhysicalPlan(); + p1.add(prj1); + List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>(); + in1.add(p1); + POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r + .nextLong()), -1, inputs1); + lr1.setPlans(in1); + lr1.setIndex(0); + + List<PhysicalOperator> inputs2 = new LinkedList<PhysicalOperator>(); + inputs2.add(poread2); + + POProject prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + prj2.setResultType(DataType.INTEGER); + PhysicalPlan p2 = new PhysicalPlan(); + p2.add(prj2); + List<PhysicalPlan> in2 = new LinkedList<PhysicalPlan>(); + in2.add(p2); + POLocalRearrange lr2 = new POLocalRearrange(new OperatorKey("", r + .nextLong()), -1, inputs2); + lr2.setPlans(in2); + lr2.setIndex(1); + + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(lr1); + inputs.add(lr2); + + POCogroup poc = new POCogroup(new OperatorKey("", r.nextLong()), -1, + inputs); + + List<Tuple> expected = new LinkedList<Tuple>(); + + Tuple t1 = TupleFactory.getInstance().newTuple(); + t1.append(1); + DataBag b1 = BagFactory.getInstance().newDefaultBag(); + Tuple temp = TupleFactory.getInstance().newTuple(); + temp.append(1); + b1.add(temp); + b1.add(temp); + t1.append(b1); + + DataBag b2 = BagFactory.getInstance().newDefaultBag(); + b2.add(temp); + t1.append(b2); + + expected.add(t1); + + t1 = TupleFactory.getInstance().newTuple(); + t1.append(2); + DataBag b3 = BagFactory.getInstance().newDefaultBag(); + temp = TupleFactory.getInstance().newTuple(); + temp.append(2); + b3.add(temp); + t1.append(b3); + + DataBag b4 = BagFactory.getInstance().newDefaultBag(); + b4.add(temp); + b4.add(temp); + t1.append(b4); + + expected.add(t1); + // System.out.println(expected.get(0) + " " + expected.get(1)); + List<Tuple> obtained = new LinkedList<Tuple>(); + for (Result res = poc.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = poc + .getNext(t)) { + System.out.println(res.result); + obtained.add((Tuple) res.result); + assertTrue(expected.contains((Tuple) res.result)); + } + assertEquals(expected.size(), obtained.size()); + } + + public void testCogroup1Input() throws ExecException { + DataBag input = BagFactory.getInstance().newDefaultBag(); + Tuple t = TupleFactory.getInstance().newTuple(); + t.append(1); + t.append(2); + input.add(t); + input.add(t); + Tuple t2 = TupleFactory.getInstance().newTuple(); + t2.append(2); + t2.append(2); + Tuple t3 = TupleFactory.getInstance().newTuple(); + t3.append(3); + t3.append(4); + input.add(t2); + input.add(t3); + + PORead poread1 = new PORead(new OperatorKey("", r.nextLong()), input); + List<PhysicalOperator> inputs1 = new LinkedList<PhysicalOperator>(); + inputs1.add(poread1); + + POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1); + prj1.setResultType(DataType.INTEGER); + PhysicalPlan p1 = new PhysicalPlan(); + p1.add(prj1); + List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>(); + in1.add(p1); + POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r + .nextLong()), -1, inputs1); + lr1.setPlans(in1); + lr1.setIndex(0); + + List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>(); + inputs.add(lr1); + + List<Tuple> expected = new LinkedList<Tuple>(); + Tuple tOut1 = TupleFactory.getInstance().newTuple(); + tOut1.append(2); + DataBag t11 = BagFactory.getInstance().newDefaultBag(); + t11.add(t); + t11.add(t); + t11.add(t2); + tOut1.append(t11); + expected.add(tOut1); + Tuple tOut2 = TupleFactory.getInstance().newTuple(); + tOut2.append(4); + DataBag t22 = BagFactory.getInstance().newDefaultBag(); + t22.add(t3); + tOut2.append(t22); + expected.add(tOut2); + POCogroup poc = new POCogroup(new OperatorKey("", r.nextLong()), -1, + inputs); + int count = 0; + for (Result res = poc.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = poc + .getNext(t)) { + System.out.println(res.result); + count++; + assertTrue(expected.contains(res.result)); + } + assertEquals(expected.size(), count); + } + +}