Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java?rev=911616&r1=911615&r2=911616&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java Thu Feb 18 22:20:07 2010 @@ -56,6 +56,11 @@ super(); } + @Override + public boolean isEqual(OperatorPlan other) { + return false; + } + } private static class OP extends Operator { @@ -66,6 +71,11 @@ public void accept(PlanVisitor v) { } + + @Override + public boolean isEqual(Operator operator) { + return false; + } } private static class OP_Load extends OP { @@ -100,6 +110,7 @@ OperatorPlan plan = null; + Operator join; public void setUp() { plan = new SillyPlan(); @@ -134,7 +145,47 @@ plan.connect(t1, f3); plan.connect(t1, f4); plan.connect(f3, s1); - plan.connect(f4, s2); + plan.connect(f4, s2); + + join = j1; + } + + + public void testMultiNode() throws Exception { + // load --|-join - filter - filter - split |- filter - store + // load --| + // load -- filter-| + Operator l3 = new OP_Load("p3", plan); + Operator f5 = new OP_Filter("f5", plan); + plan.add(l3); + plan.add(f5); + plan.connect(l3, f5); + + plan.connect(f5, join); + + + OperatorPlan pattern = new SillyPlan(); + Operator op1 = new OP_Load("mmm1", pattern); + Operator op2 = new OP_Filter("mmm2", pattern); + Operator op3 = new OP_Join("mmm3", pattern); + pattern.add(op1); + pattern.add(op2); + pattern.add(op3); + pattern.connect(op1, op3); + pattern.connect(op2, op3); + + Rule r = new SillyRule("basic", pattern); + List<OperatorPlan> l = r.match(plan); + assertEquals(1, l.size()); + OperatorPlan match = l.get(0); + assertEquals(match.size(), 3); + assertEquals(match.getSinks().size(), 1); + assertEquals(match.getSinks().get(0), join); + + assertEquals(match.getSources().size(), 2); + assertTrue(match.getSources().get(0).getClass().equals(OP_Load.class) || match.getSources().get(0).equals(f5) ); + assertTrue(match.getSources().get(1).getClass().equals(OP_Load.class) || match.getSources().get(1).equals(f5) ); + assertNotSame(match.getSources().get(0), match.getSources().get(1)); } public void testSingleNodeMatch() { @@ -146,11 +197,11 @@ List<OperatorPlan> l = r.match(plan); assertEquals(l.size(), 2); - Operator m1 = l.get(0).getRoots().get(0); + Operator m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2")); assertEquals(l.get(0).size(), 1); - Operator m2 = l.get(1).getRoots().get(0); + Operator m2 = l.get(1).getSources().get(0); assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2")); assertEquals(l.get(1).size(), 1); assertNotSame(m1.getName(), m2.getName()); @@ -162,12 +213,12 @@ l = r.match(plan); assertEquals(l.size(), 4); - m1 = l.get(0).getRoots().get(0); + m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") || m1.getName().equals("f3") || m1.getName().equals("f4")); assertEquals(l.get(0).size(), 1); - m2 = l.get(1).getRoots().get(0); + m2 = l.get(1).getSources().get(0); assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") || m1.getName().equals("f3") || m1.getName().equals("f4")); assertEquals(l.get(1).size(), 1); @@ -180,11 +231,11 @@ l = r.match(plan); assertEquals(l.size(), 2); - m1 = l.get(0).getRoots().get(0); + m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("s1") || m1.getName().equals("s2")); assertEquals(l.get(0).size(), 1); - m2 = l.get(1).getRoots().get(0); + m2 = l.get(1).getSources().get(0); assertTrue(m2.getName().equals("s1") || m2.getName().equals("s2")); assertEquals(l.get(1).size(), 1); assertNotSame(m1.getName(), m2.getName()); @@ -196,7 +247,7 @@ l = r.match(plan); assertEquals(l.size(), 1); - m1 = l.get(0).getRoots().get(0); + m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("t1")); assertEquals(l.get(0).size(), 1); @@ -207,7 +258,7 @@ l = r.match(plan); assertEquals(l.size(), 1); - m1 = l.get(0).getRoots().get(0); + m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("j1")); assertEquals(l.get(0).size(), 1); @@ -223,13 +274,13 @@ List<OperatorPlan> l = r.match(plan); assertEquals(l.size(), 1); - assertEquals(l.get(0).getRoots().size(), 2); - assertEquals(l.get(0).getLeaves().size(), 2); + assertEquals(l.get(0).getSources().size(), 2); + assertEquals(l.get(0).getSinks().size(), 2); assertEquals(l.get(0).size(), 2); - Operator m1 = l.get(0).getRoots().get(0); + Operator m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2")); - Operator m2 = l.get(0).getRoots().get(1); + Operator m2 = l.get(0).getSources().get(1); assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2")); assertNotSame(m1.getName(), m2.getName()); @@ -246,13 +297,13 @@ l = r.match(plan); assertEquals(l.size(), 1); - assertEquals(l.get(0).getRoots().size(), 1); - assertEquals(l.get(0).getLeaves().size(), 1); + assertEquals(l.get(0).getSources().size(), 1); + assertEquals(l.get(0).getSinks().size(), 1); assertEquals(l.get(0).size(), 2); - m1 = l.get(0).getRoots().get(0); + m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("j1")); - m2 = l.get(0).getLeaves().get(0); + m2 = l.get(0).getSinks().get(0); assertTrue(m2.getName().equals("f1")); @@ -268,8 +319,8 @@ l = r.match(plan); assertEquals(2, l.size()); - assertEquals(l.get(0).getRoots().size(), 1); - assertEquals(l.get(0).getLeaves().size(), 1); + assertEquals(l.get(0).getSources().size(), 1); + assertEquals(l.get(0).getSinks().size(), 1); // search for 2 loads, then join pattern = new SillyPlan(); @@ -301,15 +352,15 @@ l = r.match(plan); assertEquals(1, l.size()); - assertEquals(l.get(0).getRoots().size(), 1); - assertEquals(l.get(0).getLeaves().size(), 2); + assertEquals(l.get(0).getSources().size(), 1); + assertEquals(l.get(0).getSinks().size(), 2); assertEquals(l.get(0).size(), 3); - m1 = l.get(0).getRoots().get(0); + m1 = l.get(0).getSources().get(0); assertTrue(m1.getName().equals("t1")); - m2 = l.get(0).getLeaves().get(0); + m2 = l.get(0).getSinks().get(0); assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4")); - m2 = l.get(0).getLeaves().get(1); + m2 = l.get(0).getSinks().get(1); assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4")); }
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java?rev=911616&r1=911615&r2=911616&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java Thu Feb 18 22:20:07 2010 @@ -811,7 +811,7 @@ PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); PigContext context = server.getPigContext(); - String strCmd = "register 'pig.jar'\n"; + String strCmd = "register 'pig-withouthadoop.jar'\n"; ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); InputStreamReader reader = new InputStreamReader(cmd); @@ -819,14 +819,14 @@ Grunt grunt = new Grunt(new BufferedReader(reader), context); grunt.exec(); - assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig.jar"))); + assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar"))); } public void testRegisterWithoutQuotes() throws Throwable { PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); PigContext context = server.getPigContext(); - String strCmd = "register pig.jar\n"; + String strCmd = "register pig-withouthadoop.jar\n"; ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes()); InputStreamReader reader = new InputStreamReader(cmd); @@ -834,6 +834,6 @@ Grunt grunt = new Grunt(new BufferedReader(reader), context); grunt.exec(); - assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig.jar"))); + assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar"))); } } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java?rev=911616&r1=911615&r2=911616&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java Thu Feb 18 22:20:07 2010 @@ -98,6 +98,40 @@ } @Test + public void testJoinWithMissingFieldsInTuples() throws IOException{ + + setUp(ExecType.MAPREDUCE); + String[] input1 = { + "ff ff ff", + "", + "", + "", + "", + "ff ff ff", + "", + "" + }; + String[] input2 = { + "", + "", + "", + "", + "" + }; + + String firstInput = createInputFile(ExecType.MAPREDUCE, "a.txt", input1); + String secondInput = createInputFile(ExecType.MAPREDUCE, "b.txt", input2); + String script = "a = load 'a.txt' using PigStorage(' ');" + + "b = load 'b.txt' using PigStorage('\u0001');" + + "c = join a by $0, b by $0;"; + Util.registerMultiLineQuery(pigServer, script); + Iterator<Tuple> it = pigServer.openIterator("c"); + assertFalse(it.hasNext()); + deleteInputFile(ExecType.MAPREDUCE, firstInput); + deleteInputFile(ExecType.MAPREDUCE, secondInput); + } + + @Test public void testJoinUnkownSchema() throws Exception { // If any of the input schema is unknown, the resulting schema should be unknown as well for (ExecType execType : execTypes) { Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java?rev=911616&r1=911615&r2=911616&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java Thu Feb 18 22:20:07 2010 @@ -195,22 +195,29 @@ try { PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); w.println("10\t2\t3"); + w.println("10\t4\t5"); + w.println("20\t3000\t2"); + w.println("20\t4000\t3"); w.println("20\t3\t"); + w.println("21\t4\t"); + w.println("22\t5\t"); w.close(); Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); PigServer myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, a2);"); - myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) a1) > .001 OR a0 < 11 ? a0 : -1);"); + myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) a1) > .001 OR a0 < 11 ? a0 : 0);"); + myPig.registerQuery("res = FOREACH grp GENERATE group, SUM(data.a1), SUM(data.a2);"); List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( - new String[] { - "(10,{(10,2,3)})", - "(null,{(20,3,null)})" + new String[] { + "(0,7000.0,5.0)", + "(10,6.0,8.0)", + "(null,12.0,null)" }); - Iterator<Tuple> iter = myPig.openIterator("grp"); + Iterator<Tuple> iter = myPig.openIterator("res"); int counter = 0; while (iter.hasNext()) { assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java?rev=911616&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java (added) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java Thu Feb 18 22:20:07 2010 @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + + +import java.util.ArrayList; +import java.util.List; + +import org.apache.pig.FuncSpec; +import org.apache.pig.data.DataType; +import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor; +import org.apache.pig.experimental.logical.expression.AndExpression; +import org.apache.pig.experimental.logical.expression.CastExpression; +import org.apache.pig.experimental.logical.expression.ConstantExpression; +import org.apache.pig.experimental.logical.expression.EqualExpression; +import org.apache.pig.experimental.logical.expression.LogicalExpression; +import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan; +import org.apache.pig.experimental.logical.expression.ProjectExpression; +import org.apache.pig.experimental.logical.optimizer.UidStamper; +import org.apache.pig.experimental.logical.relational.LOForEach; +import org.apache.pig.experimental.logical.relational.LOGenerate; +import org.apache.pig.experimental.logical.relational.LOInnerLoad; +import org.apache.pig.experimental.logical.relational.LOJoin; +import org.apache.pig.experimental.logical.relational.LOLoad; +import org.apache.pig.experimental.logical.relational.LOStore; +import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; +import org.apache.pig.experimental.logical.relational.LogicalSchema; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.test.utils.LogicalPlanTester; + +import junit.framework.TestCase; + +public class TestLogicalPlanMigrationVisitor extends TestCase { + + public void testSimplePlan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt';"); + lpt.buildPlan("b = filter a by $0==NULL;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan); + assertEquals(3, newPlan.size()); + assertEquals(newPlan.getSources().size(), 1); + + // check load + LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class); + + // check filter + op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class); + + LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan(); + + EqualExpression eq = (EqualExpression)exp.getSources().get(0); + assertEquals(eq.getLhs().getClass(), ProjectExpression.class); + assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 0); + assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0); + + assertEquals(eq.getRhs().getClass(), ConstantExpression.class); + + // check store + op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class); + } + + public void testPlanWithCast() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (id, c);"); + lpt.buildPlan("b = filter a by (int)id==10;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan); + assertEquals(3, newPlan.size()); + assertEquals(newPlan.getSources().size(), 1); + + // check load + LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class); + + // check filter + op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class); + + LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan(); + + EqualExpression eq = (EqualExpression)exp.getSources().get(0); + assertEquals(eq.getLhs().getClass(), CastExpression.class); + + assertEquals(eq.getLhs().getClass(), CastExpression.class); + LogicalExpression ep = (LogicalExpression)exp.getSuccessors(eq.getLhs()).get(0); + assertEquals(ep.getClass(), ProjectExpression.class); + assertEquals(((ProjectExpression)ep).getColNum(), 0); + assertEquals(((ProjectExpression)ep).getInputNum(), 0); + + assertEquals(eq.getRhs().getClass(), ConstantExpression.class); + + // check store + op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class); + } + + public void testJoinPlan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd1.txt' as (id, c);"); + lpt.buildPlan("b = load 'd2.txt'as (id, c);"); + lpt.buildPlan("c = join a by id, b by c;"); + lpt.buildPlan("d = filter c by a::id==NULL AND b::c==NULL;"); + LogicalPlan plan = lpt.buildPlan("store d into 'empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan); + assertEquals(5, newPlan.size()); + assertEquals(newPlan.getSources().size(), 2); + + // check load and join + LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSuccessors(newPlan.getSources().get(0)).get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOJoin.class); + assertEquals(((LOJoin)op).getJoinType(), LOJoin.JOINTYPE.HASH); + + LogicalRelationalOperator l1 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(0); + assertEquals(l1.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class); + assertEquals(l1.getAlias(), "a"); + + LogicalRelationalOperator l2 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(1); + assertEquals(l2.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class); + assertEquals(l2.getAlias(), "b"); + + // check join input plans + LogicalExpressionPlan p1 = ((LOJoin)op).getJoinPlan(0).iterator().next(); + assertEquals(p1.size(), 1); + + ProjectExpression prj = (ProjectExpression)p1.getSources().get(0); + + assertEquals(prj.getInputNum(), 0); + assertEquals(prj.getColNum(), 0); + + LogicalExpressionPlan p2 = ((LOJoin)op).getJoinPlan(1).iterator().next(); + assertEquals(p2.size(), 1); + + prj = (ProjectExpression)p2.getSources().get(0); + + assertEquals(prj.getInputNum(), 1); + assertEquals(prj.getColNum(), 1); + + // check filter + op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class); + LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan(); + + AndExpression ae = (AndExpression)exp.getSources().get(0); + + EqualExpression eq = (EqualExpression)exp.getSuccessors(ae).get(0); + assertEquals(eq.getLhs().getClass(), ProjectExpression.class); + assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 0); + assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0); + + assertEquals(eq.getRhs().getClass(), ConstantExpression.class); + + eq = (EqualExpression)exp.getSuccessors(ae).get(1); + assertEquals(eq.getLhs().getClass(), ProjectExpression.class); + assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 3); + assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0); + + assertEquals(eq.getRhs().getClass(), ConstantExpression.class); + + // check store + op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0); + assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class); + } + + public void testForeachPlan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load '/test/d.txt' as (id, d);"); + lpt.buildPlan("b = foreach a generate id, FLATTEN(d);"); + LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan); + + org.apache.pig.experimental.logical.relational.LogicalPlan expected = + new org.apache.pig.experimental.logical.relational.LogicalPlan(); + + LogicalSchema aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("d", null, DataType.BYTEARRAY)); + LOLoad load = new LOLoad(new FileSpec("file:///test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected); + expected.add(load); + + LOForEach foreach = new LOForEach(expected); + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan(); + LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach, 0); + innerPlan.add(l1); + LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach, 1); + + List<LogicalExpressionPlan> eps = new ArrayList<LogicalExpressionPlan>(); + LogicalExpressionPlan p1 = new LogicalExpressionPlan(); + p1.add(new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0)); + LogicalExpressionPlan p2 = new LogicalExpressionPlan(); + p2.add(new ProjectExpression(p2, DataType.BYTEARRAY, 1, 0)); + eps.add(p1); + eps.add(p2); + + LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true}); + innerPlan.add(gen); + innerPlan.connect(l1, gen); + innerPlan.connect(l2, gen); + + foreach.setInnerPlan(innerPlan); + expected.add(foreach); + + LOStore s = new LOStore(expected, new FileSpec("file:///test/empty", new FuncSpec("org.apache.pig.builtin.PigStorage"))); + + expected.add(s); + + expected.connect(load, foreach); + expected.connect(foreach, s); + + try { + UidStamper stamper = new UidStamper(expected); + stamper.visit(); + }catch(Exception e) { + throw new VisitorException(e); + } + + assertTrue(expected.isEqual(newPlan)); + + LogicalSchema schema = foreach.getSchema(); + aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("d", null, DataType.BYTEARRAY)); + assertTrue(schema.isEqual(aschema)); + } + + public void testForeachSchema() throws Exception { + // test flatten + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load '/test/d.txt' as (id, d:tuple(v, s));"); + LogicalPlan plan = lpt.buildPlan("b = foreach a generate id, FLATTEN(d);"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan); + LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSinks().get(0); + + LogicalSchema s2 = new LogicalSchema(); + s2.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + s2.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY)); + s2.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY)); + assertTrue(s2.isEqual(op.getSchema())); + + // test no flatten + lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v, s)});"); + plan = lpt.buildPlan("b = foreach a generate id, d;"); + + newPlan = migratePlan(plan); + op = (LogicalRelationalOperator)newPlan.getSinks().get(0); + + LogicalSchema aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + LogicalSchema aschema2 = new LogicalSchema(); + LogicalSchema aschema3 = new LogicalSchema(); + aschema3.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY)); + aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY)); + aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG)); + + assertTrue(aschema.isEqual(op.getSchema())); + } + + public void testForeachPlan2() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v, s)});"); + lpt.buildPlan("b = foreach a generate id, FLATTEN(d);"); + LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan); + + org.apache.pig.experimental.logical.relational.LogicalPlan expected = + new org.apache.pig.experimental.logical.relational.LogicalPlan(); + + LogicalSchema aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + LogicalSchema aschema2 = new LogicalSchema(); + LogicalSchema aschema3 = new LogicalSchema(); + aschema3.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY)); + aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY)); + aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG)); + + LOLoad load = new LOLoad(new FileSpec("file:///test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected); + expected.add(load); + + LOForEach foreach2 = new LOForEach(expected); + org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan(); + LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach2, 0); + innerPlan.add(l1); + LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach2, 1); + + List<LogicalExpressionPlan> eps = new ArrayList<LogicalExpressionPlan>(); + LogicalExpressionPlan p1 = new LogicalExpressionPlan(); + new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0); + LogicalExpressionPlan p2 = new LogicalExpressionPlan(); + new ProjectExpression(p2, DataType.BAG, 1, 0); + eps.add(p1); + eps.add(p2); + + LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true}); + innerPlan.add(gen); + innerPlan.connect(l1, gen); + innerPlan.connect(l2, gen); + + foreach2.setInnerPlan(innerPlan); + expected.add(foreach2); + + LOStore s = new LOStore(expected, new FileSpec("file:///test/empty", new FuncSpec("org.apache.pig.builtin.PigStorage"))); + + expected.add(s); + + expected.connect(load, foreach2); + + expected.connect(foreach2, s); + try { + UidStamper stamper = new UidStamper(expected); + stamper.visit(); + }catch(Exception e) { + throw new VisitorException(e); + } + + assertTrue(expected.isEqual(newPlan)); + + LogicalSchema schema = foreach2.getSchema(); + aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY)); + aschema.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY)); + assertTrue(schema.isEqual(aschema)); + } + + private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{ + LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp); + visitor.visit(); + + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan(); + try { + UidStamper stamper = new UidStamper(newPlan); + stamper.visit(); + + return newPlan; + }catch(Exception e) { + throw new VisitorException(e); + } + } +} Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=911616&r1=911615&r2=911616&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Thu Feb 18 22:20:07 2010 @@ -108,6 +108,66 @@ myPig = null; } + public void testMultiQueryJiraPig1169() { + + // test case: Problems with some top N queries + + String INPUT_FILE = "abc"; + + try { + + PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); + w.println("1\t2\t3"); + w.println("2\t3\t4"); + w.println("3\t4\t5"); + w.println("5\t6\t7"); + w.println("6\t7\t8"); + w.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE); + + myPig.setBatchOn(); + + myPig.registerQuery("A = load '" + INPUT_FILE + + "' as (a:int, b, c);"); + myPig.registerQuery("A1 = Order A by a desc parallel 3;"); + myPig.registerQuery("A2 = limit A1 2;"); + myPig.registerQuery("store A1 into '/tmp/input1';"); + myPig.registerQuery("store A2 into '/tmp/input2';"); + + myPig.executeBatch(); + + myPig.registerQuery("B = load '/tmp/input2' as (a:int, b, c);"); + + Iterator<Tuple> iter = myPig.openIterator("B"); + + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] { + "(6,7,8)", + "(5,6,7)" + }); + + int counter = 0; + while (iter.hasNext()) { + assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); + } + + assertEquals(expectedResults.size(), counter); + + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } finally { + new File(INPUT_FILE).delete(); + try { + Util.deleteFile(cluster, INPUT_FILE); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + public void testMultiQueryJiraPig1171() { // test case: Problems with some top N queries Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java?rev=911616&r1=911615&r2=911616&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java Thu Feb 18 22:20:07 2010 @@ -149,12 +149,16 @@ // generate jar file String jarName = "TestUDFJar.jar"; + String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName; status = Util.executeShellCommand("jar -cf " + tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName + " -C " + tmpDir.getAbsolutePath() + " " + "com"); assertTrue(status==0); + Properties properties = cluster.getProperties(); + PigContext pigContext = new PigContext(ExecType.MAPREDUCE, properties); - PigServer pig = new PigServer(pigContext); - pig.registerJar(tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName); + //register jar using properties + pigContext.getProperties().setProperty("pig.additional.jars", jarFile); + PigServer pigServer = new PigServer(pigContext); PigContext.initializeImportList("com.xxx.udf1:com.xxx.udf2."); ArrayList<String> importList = PigContext.getPackageImportList(); @@ -180,7 +184,6 @@ } Util.createInputFile(cluster, tmpFile.getCanonicalPath(), input); FileLocalizer.deleteTempFiles(); - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); pigServer.registerQuery("A = LOAD '" + tmpFile.getCanonicalPath() + "' using TestUDF2() AS (num:chararray);"); pigServer.registerQuery("B = foreach A generate TestUDF1(num);"); Iterator<Tuple> iter = pigServer.openIterator("B");
