Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Tue Jan 22 13:17:12 2008 @@ -19,7 +19,6 @@ import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; @@ -73,7 +72,6 @@ assertFalse(da1.compareTo(da2) > 0); } - /* Replaced by TestTuple.java @Test public void testTuple() throws Exception { int arity = 5; @@ -160,9 +158,8 @@ n1.appendTuple(n2); assertTrue(n1.arity() == n1Arity + n2Arity); } - */ - /* Replaced by TestDataBag.java + /* @Test public void testDataBag() throws Exception { int[] input1 = { 1, 2, 3, 4, 5 }; @@ -197,8 +194,19 @@ caught = true; } assertTrue(caught); + + // check that notifications are sent + b.clear(); + DataBag.notifyInterval = 2; + Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1); + for (int i = 0; i < 10; i++) { + b.add(g); + } + + Iterator it = b.content(); + while (it.hasNext()) it.next(); + assert(b.numNotifies == 5); } - */ @Test @@ -208,198 +216,81 @@ public void testBigDataBagOnDisk() throws Exception{ Runtime.getRuntime().gc(); - //testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000); + testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000); } + */ + private enum TestType { + PRE_SORT, + POST_SORT, + PRE_DISTINCT, + POST_DISTINCT, + NONE + } + + /* private void testBigDataBag(long freeMemoryToMaintain, int numItems) throws Exception { BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain; - File tmp = File.createTempFile("test", "bag").getParentFile(); - BigDataBag bag = new BigDataBag(Datum.DataType.TUPLE, tmp); - Iterator<Datum> it; - int count; - //String last; - Tuple lastT = null; - Random r = new Random(); - - - //Basic test - assertTrue(bag.isEmpty()); - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(2); - t.setField(0, Integer.toHexString(i)); - t.setField(1, i); - bag.add(t); - } - - assertFalse(bag.isEmpty()); - - assertTrue(bag.cardinality() == numItems); - - int lastI = -1; - it = bag.content(); - count = 0; - while(it.hasNext()) { - Tuple t = (Tuple)it.next(); - int ix = Integer.parseInt(t.getAtomField(0).strval(), 16); - assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval())); - assertEquals(lastI+1, ix); - lastI = ix; - count++; - } - - assertTrue(bag.cardinality() == count); - - bag.clear(); - - //Test pre sort - - bag.sort(); - - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - t.setField(0, r.nextInt(100000)); - bag.add(t); - } - - it = bag.content(); - count = 0; - // last= ""; - lastT = new Tuple(); - while(it.hasNext()) { - Tuple t = (Tuple)it.next(); - /* - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<=0); - last = next; - */ - assertTrue("last should be <= next", lastT.compareTo(t) <= 0); - lastT = t; - count++; - } - - assertTrue(bag.cardinality() == count); - - bag.clear(); - - - //Test post sort - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - t.setField(0, r.nextInt(100000)); - bag.add(t); - } - - bag.sort(); + + for (TestType testType: TestType.values()){ + BigDataBag bag = BagFactory.getInstance().getNewBigBag(); + + assertTrue(bag.isEmpty()); + + if (testType == TestType.PRE_SORT) + bag.sort(); + else if (testType == TestType.PRE_DISTINCT) + bag.distinct(); + + //generate data and add it to the bag + for(int i = 0; i < numItems; i++) { + Tuple t = new Tuple(1); + t.setField(0, r.nextInt(numItems)); + bag.add(t); + } + + assertFalse(bag.isEmpty()); + + if (testType == TestType.POST_SORT) + bag.sort(); + else if (testType == TestType.POST_DISTINCT) + bag.distinct(); + + + if (testType == TestType.NONE) + assertTrue(bag.cardinality() == numItems); + checkContents(bag, numItems, testType); + checkContents(bag, numItems, testType); - it = bag.content(); - count = 0; - //last= ""; - lastT = new Tuple(); - while(it.hasNext()) { - Tuple t = (Tuple)it.next(); - /* - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<=0); - last = next; - */ - assertTrue("last should be <= next", lastT.compareTo(t) <= 0); - lastT = t; - count++; - } - - assertTrue(bag.cardinality() == count); - - bag.clear(); - - //test post-distinct - - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - //To get a lot of duplicates - t.setField(0, r.nextInt(1000)); - bag.add(t); - } + } + } + + + private void checkContents(DataBag bag, int numItems, TestType testType) throws Exception{ + String last = ""; + DataBag.notifyInterval = 100; - bag.distinct(); - - it = bag.content(); - count = 0; - //last= ""; - lastT = new Tuple(); + Iterator<Tuple> it = bag.content(); + int count = 0; while(it.hasNext()) { - Tuple t = (Tuple)it.next(); - /* - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<0); + Tuple t = it.next(); + String next = t.getAtomField(0).strval(); + if (testType == TestType.POST_SORT || testType == TestType.PRE_SORT) + assertTrue(last.compareTo(next)<=0); + else if (testType == TestType.POST_DISTINCT || testType == TestType.PRE_DISTINCT) + assertTrue(last.compareTo(next)<0); last = next; - */ - assertTrue("last should be <= next", lastT.compareTo(t) <= 0); - lastT = t; - count++; - } - - assertTrue(bag.cardinality() == count); - - bag.clear(); - - - //Test pre distinct - - bag.distinct(); - - - for(int i = 0; i < numItems; i++) { - Tuple t = new Tuple(1); - //To get a lot of duplicates - t.setField(0, r.nextInt(10)); - bag.add(t); + count++; } - - it = bag.content(); - count = 0; - //last= ""; - lastT = new Tuple(); - while(it.hasNext()) { - Tuple t = (Tuple)it.next(); - /* - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<0); - last = next; - */ - assertTrue("last should be <= next", lastT.compareTo(t) <= 0); - lastT = t; - count++; - } - - assertTrue(bag.cardinality() == count); - - //Check if it gives the correct contents the second time around - it = bag.content(); - count = 0; - //last= ""; - lastT = new Tuple(); - while(it.hasNext()) { - Tuple t = (Tuple)it.next(); - /* - String next = t.getAtomField(0).strval(); - assertTrue(last.compareTo(next)<0); - last = next; - */ - assertTrue("last should be <= next", lastT.compareTo(t) <= 0); - lastT = t; - count++; - } - assertTrue(bag.cardinality() == count); - bag.clear(); + if (testType != TestType.NONE) + assertTrue(bag.numNotifies >= count/DataBag.notifyInterval); } + */ + }
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Tue Jan 22 13:17:12 2008 @@ -15,50 +15,49 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pig.test; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.StringTokenizer; - -import org.junit.Test; - -import org.apache.pig.EvalFunc; -import org.apache.pig.PigServer; -import org.apache.pig.builtin.BinStorage; -import org.apache.pig.builtin.PigStorage; -import org.apache.pig.builtin.TextLoader; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataMap; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.FileLocalizer; -import org.apache.pig.impl.io.PigFile; - -import junit.framework.TestCase; - -public class TestEvalPipeline extends TestCase { - - String initString = "mapreduce"; - - - static public class MyBagFunction extends EvalFunc<DataBag>{ - @Override - public void exec(Tuple input, DataBag output) throws IOException { - output.add(new Tuple("a")); - output.add(new Tuple("a")); - output.add(new Tuple("a")); - - } - } +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.StringTokenizer; + +import org.junit.Test; + +import org.apache.pig.EvalFunc; +import org.apache.pig.PigServer; +import org.apache.pig.builtin.BinStorage; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.builtin.TextLoader; +import org.apache.pig.data.*; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.io.PigFile; + +import junit.framework.TestCase; + +public class TestEvalPipeline extends TestCase { + + String initString = "mapreduce"; + + + static public class MyBagFunction extends EvalFunc<DataBag>{ + @Override + public void exec(Tuple input, DataBag output) throws IOException { + output.add(new Tuple("a")); + output.add(new Tuple("a")); + output.add(new Tuple("a")); + + } + } + private File createFile(String[] data) throws Exception{ File f = File.createTempFile("tmp", ""); @@ -84,232 +83,230 @@ assertEquals(iter.next().getAtomField(0).numval(), 0.0); } - } - - @Test - public void testJoin() throws Exception{ - PigServer pigServer = new PigServer(initString); - - File f1 = createFile(new String[]{"a:1","b:1","a:1"}); - File f2 = createFile(new String[]{"b","b","a"}); - - pigServer.registerQuery("a = load 'file:" + f1 + "' using " + PigStorage.class.getName() + "(':');"); - pigServer.registerQuery("b = load 'file:" + f2 + "';"); - pigServer.registerQuery("c = cogroup a by $0, b by $0;"); - pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);"); - - Iterator<Tuple> iter = pigServer.openIterator("d"); - int count = 0; - while(iter.hasNext()){ - Tuple t = iter.next(); - assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval())); - count++; - } - assertEquals(count, 4); - } - - @Test - public void testDriverMethod() throws Exception{ - PigServer pigServer = new PigServer(initString); - File f = File.createTempFile("tmp", ""); - PrintWriter pw = new PrintWriter(f); - pw.println("a"); - pw.println("a"); - pw.close(); - pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));"); - pigServer.registerQuery("b = foreach a generate $0, flatten($1);"); - Iterator<Tuple> iter = pigServer.openIterator("a"); - int count = 0; - while(iter.hasNext()){ - Tuple t = iter.next(); - assertTrue(t.getAtomField(0).strval().equals("1")); - assertTrue(t.getAtomField(1).strval().equals("a")); - count++; - } - assertEquals(count, 6); - f.delete(); - } - - - /* Replaced by TestDataMap.java - @Test - public void testMapLookup() throws IOException{ - PigServer pigServer = new PigServer(initString); - DataBag b = new DataBag(); - DataMap colors = new DataMap(); - colors.put("apple","red"); - colors.put("orange","orange"); - - DataMap weights = new DataMap(); - weights.put("apple","0.1"); - weights.put("orange","0.3"); - - Tuple t = new Tuple(); - t.appendField(colors); - t.appendField(weights); - b.add(t); - - String fileName = "file:"+File.createTempFile("tmp", ""); - PigFile f = new PigFile(fileName); - f.store(b, new BinStorage(), pigServer.getPigContext()); - - - pigServer.registerQuery("a = load '" + fileName + "' using BinStorage();"); - pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');"); - Iterator<Tuple> iter = pigServer.openIterator("b"); - t = iter.next(); - assertEquals(t.getAtomField(0).strval(), "red"); - assertEquals(t.getAtomField(1).numval(), 0.3); - assertFalse(iter.hasNext()); - } - */ - - - static public class TitleNGrams extends EvalFunc<DataBag> { - - @Override - public void exec(Tuple input, DataBag output) throws IOException { - String str = input.getAtomField(0).strval(); - - String title = str; - - if (title != null) { - List<String> nGrams = makeNGrams(title); - - for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) { - Tuple t = new Tuple(1); - t.setField(0, it.next()); - output.add(t); - } - } - } - - - List<String> makeNGrams(String str) { - List<String> tokens = new ArrayList<String>(); - - StringTokenizer st = new StringTokenizer(str); - while (st.hasMoreTokens()) - tokens.add(st.nextToken()); - - return nGramHelper(tokens, new ArrayList<String>()); - } - - ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) { - if (str.size() == 0) - return nGrams; - - for (int i = 0; i < str.size(); i++) - nGrams.add(makeString(str.subList(0, i+1))); - - return nGramHelper(str.subList(1, str.size()), nGrams); - } - - String makeString(List<String> list) { - StringBuffer sb = new StringBuffer(); - for (Iterator<String> it = list.iterator(); it.hasNext(); ) { - sb.append(it.next()); - if (it.hasNext()) - sb.append(" "); - } - return sb.toString(); - } - } - - - - - @Test - public void testBagFunctionWithFlattening() throws Exception{ - PigServer pigServer = new PigServer(initString); - File queryLogFile = createFile( - new String[]{ - "stanford\tdeer\tsighting", - "bush\tpresident", - "stanford\tbush", - "conference\tyahoo", - "world\tcup\tcricket", - "bush\twins", - "stanford\tpresident", - } - ); - - File newsFile = createFile( - new String[]{ - "deer seen at stanford", - "george bush visits stanford", - "yahoo hosting a conference in the bay area", - "who will win the world cup" - } - ); - - Map<String, Integer> expectedResults = new HashMap<String, Integer>(); - expectedResults.put("bush", 2); - expectedResults.put("stanford", 3); - expectedResults.put("world", 1); - expectedResults.put("conference", 1); - - pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile + "' USING " + TextLoader.class.getName() + "();"); - pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + "';"); - - pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));"); - pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;"); - pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;"); - - Iterator<Tuple> iter = pigServer.openIterator("answer"); - while(iter.hasNext()){ - Tuple t = iter.next(); - assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue()); - } - } - - - - @Test - public void testSort() throws Exception{ - testSortDistinct(false); - } - - - @Test - public void testDistinct() throws Exception{ - testSortDistinct(true); - } - - private void testSortDistinct(boolean eliminateDuplicates) throws Exception{ - int LOOP_SIZE = 1024*16; - File tmpFile = File.createTempFile("test", "txt"); - PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); - Random r = new Random(); - for(int i = 0; i < LOOP_SIZE; i++) { - ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i); - } - ps.close(); - - PigServer pig = new PigServer(initString); - String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString(); - pig.registerQuery("A = LOAD 'file:" + tmpFile + "';"); - if (eliminateDuplicates){ - pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;"); - }else{ - pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;"); - } - pig.store("B", tmpOutputFile); - - pig.registerQuery("A = load '" + tmpOutputFile + "';"); - Iterator<Tuple> iter = pig.openIterator("A"); - int last = -1; - while (iter.hasNext()){ - Tuple t = iter.next(); - if (eliminateDuplicates){ - assertTrue(last < t.getAtomField(0).numval().intValue()); - }else{ - assertTrue(last <= t.getAtomField(0).numval().intValue()); - assertEquals(t.arity(), 2); - } - } - - } - - -} + } + + @Test + public void testJoin() throws Exception{ + PigServer pigServer = new PigServer(initString); + + File f1 = createFile(new String[]{"a:1","b:1","a:1"}); + File f2 = createFile(new String[]{"b","b","a"}); + + pigServer.registerQuery("a = load 'file:" + f1 + "' using " + PigStorage.class.getName() + "(':');"); + pigServer.registerQuery("b = load 'file:" + f2 + "';"); + pigServer.registerQuery("c = cogroup a by $0, b by $0;"); + pigServer.registerQuery("d = foreach c generate flatten($1),flatten($2);"); + + Iterator<Tuple> iter = pigServer.openIterator("d"); + int count = 0; + while(iter.hasNext()){ + Tuple t = iter.next(); + assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval())); + count++; + } + assertEquals(count, 4); + } + + @Test + public void testDriverMethod() throws Exception{ + PigServer pigServer = new PigServer(initString); + File f = File.createTempFile("tmp", ""); + PrintWriter pw = new PrintWriter(f); + pw.println("a"); + pw.println("a"); + pw.close(); + pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));"); + pigServer.registerQuery("b = foreach a generate $0, flatten($1);"); + Iterator<Tuple> iter = pigServer.openIterator("a"); + int count = 0; + while(iter.hasNext()){ + Tuple t = iter.next(); + assertTrue(t.getAtomField(0).strval().equals("1")); + assertTrue(t.getAtomField(1).strval().equals("a")); + count++; + } + assertEquals(count, 6); + f.delete(); + } + + + @Test + public void testMapLookup() throws IOException{ + PigServer pigServer = new PigServer(initString); + DataBag b = BagFactory.getInstance().newDefaultBag(); + DataMap colors = new DataMap(); + colors.put("apple","red"); + colors.put("orange","orange"); + + DataMap weights = new DataMap(); + weights.put("apple","0.1"); + weights.put("orange","0.3"); + + Tuple t = new Tuple(); + t.appendField(colors); + t.appendField(weights); + b.add(t); + + String fileName = "file:"+File.createTempFile("tmp", ""); + PigFile f = new PigFile(fileName); + f.store(b, new BinStorage(), pigServer.getPigContext()); + + + pigServer.registerQuery("a = load '" + fileName + "' using BinStorage();"); + pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');"); + Iterator<Tuple> iter = pigServer.openIterator("b"); + t = iter.next(); + assertEquals(t.getAtomField(0).strval(), "red"); + assertEquals(t.getAtomField(1).numval(), 0.3); + assertFalse(iter.hasNext()); + } + + + static public class TitleNGrams extends EvalFunc<DataBag> { + + @Override + public void exec(Tuple input, DataBag output) throws IOException { + String str = input.getAtomField(0).strval(); + + String title = str; + + if (title != null) { + List<String> nGrams = makeNGrams(title); + + for (Iterator<String> it = nGrams.iterator(); it.hasNext(); ) { + Tuple t = new Tuple(1); + t.setField(0, it.next()); + output.add(t); + } + } + } + + + List<String> makeNGrams(String str) { + List<String> tokens = new ArrayList<String>(); + + StringTokenizer st = new StringTokenizer(str); + while (st.hasMoreTokens()) + tokens.add(st.nextToken()); + + return nGramHelper(tokens, new ArrayList<String>()); + } + + ArrayList<String> nGramHelper(List<String> str, ArrayList<String> nGrams) { + if (str.size() == 0) + return nGrams; + + for (int i = 0; i < str.size(); i++) + nGrams.add(makeString(str.subList(0, i+1))); + + return nGramHelper(str.subList(1, str.size()), nGrams); + } + + String makeString(List<String> list) { + StringBuffer sb = new StringBuffer(); + for (Iterator<String> it = list.iterator(); it.hasNext(); ) { + sb.append(it.next()); + if (it.hasNext()) + sb.append(" "); + } + return sb.toString(); + } + } + + + + + @Test + public void testBagFunctionWithFlattening() throws Exception{ + PigServer pigServer = new PigServer(initString); + File queryLogFile = createFile( + new String[]{ + "stanford\tdeer\tsighting", + "bush\tpresident", + "stanford\tbush", + "conference\tyahoo", + "world\tcup\tcricket", + "bush\twins", + "stanford\tpresident", + } + ); + + File newsFile = createFile( + new String[]{ + "deer seen at stanford", + "george bush visits stanford", + "yahoo hosting a conference in the bay area", + "who will win the world cup" + } + ); + + Map<String, Integer> expectedResults = new HashMap<String, Integer>(); + expectedResults.put("bush", 2); + expectedResults.put("stanford", 3); + expectedResults.put("world", 1); + expectedResults.put("conference", 1); + + pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile + "' USING " + TextLoader.class.getName() + "();"); + pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + "';"); + + pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE flatten(" + TitleNGrams.class.getName() + "(*));"); + pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 INNER, queryLog BY $0 INNER;"); + pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;"); + + Iterator<Tuple> iter = pigServer.openIterator("answer"); + while(iter.hasNext()){ + Tuple t = iter.next(); + assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue()); + } + } + + + + @Test + public void testSort() throws Exception{ + testSortDistinct(false); + } + + + @Test + public void testDistinct() throws Exception{ + testSortDistinct(true); + } + + private void testSortDistinct(boolean eliminateDuplicates) throws Exception{ + int LOOP_SIZE = 1024*16; + File tmpFile = File.createTempFile("test", "txt"); + PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + Random r = new Random(); + for(int i = 0; i < LOOP_SIZE; i++) { + ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i); + } + ps.close(); + + PigServer pig = new PigServer(initString); + String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString(); + pig.registerQuery("A = LOAD 'file:" + tmpFile + "';"); + if (eliminateDuplicates){ + pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;"); + }else{ + pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;"); + } + pig.store("B", tmpOutputFile); + + pig.registerQuery("A = load '" + tmpOutputFile + "';"); + Iterator<Tuple> iter = pig.openIterator("A"); + int last = -1; + while (iter.hasNext()){ + Tuple t = iter.next(); + if (eliminateDuplicates){ + assertTrue(last < t.getAtomField(0).numval().intValue()); + }else{ + assertTrue(last <= t.getAtomField(0).numval().intValue()); + assertEquals(t.arity(), 2); + } + } + + } + + +} Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue Jan 22 13:17:12 2008 @@ -78,9 +78,9 @@ } @Override public void exec(Tuple input, DataBag output) throws IOException { - Iterator<Datum> it = (input.getBagField(0)).content(); + Iterator<Tuple> it = (input.getBagField(0)).iterator(); while(it.hasNext()) { - Tuple t = (Tuple)it.next(); + Tuple t = it.next(); Tuple newT = new Tuple(2); newT.setField(0, field0); newT.setField(1, t.getField(0).toString()); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java Tue Jan 22 13:17:12 2008 @@ -33,18 +33,14 @@ import org.apache.pig.PigServer; import org.apache.pig.builtin.BinStorage; import org.apache.pig.builtin.PigStorage; -import org.apache.pig.data.DataAtom; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataMap; -import org.apache.pig.data.Datum; -import org.apache.pig.data.Tuple; +import org.apache.pig.data.*; import org.apache.pig.PigServer.ExecType; import org.apache.pig.impl.io.PigFile; import org.apache.pig.impl.PigContext; public class TestPigFile extends TestCase { - DataBag bag = new DataBag(Datum.DataType.TUPLE); + DataBag bag = BagFactory.getInstance().newDefaultBag(); Random rand = new Random(); @Override @@ -89,13 +85,13 @@ DataBag loaded = load.load(new PigStorage(), pigContext); System.out.println("Done."); - assertTrue(bag.cardinality() == loaded.cardinality()); + assertTrue(bag.size() == loaded.size()); - Iterator<Datum> it1 = bag.content(); - Iterator<Datum> it2 = loaded.content(); + Iterator<Tuple> it1 = bag.iterator(); + Iterator<Tuple> it2 = loaded.iterator(); while (it1.hasNext() && it2.hasNext()) { - Tuple f1 = (Tuple)it1.next(); - Tuple f2 = (Tuple)it2.next(); + Tuple f1 = it1.next(); + Tuple f2 = it2.next(); assertTrue(f1.equals(f2)); } assertFalse(it1.hasNext() || it2.hasNext()); @@ -131,7 +127,7 @@ private DataBag getRandomBag(int maxCardinality, int nestingLevel) throws IOException{ int cardinality = rand.nextInt(maxCardinality)+1; - DataBag b = new DataBag(Datum.DataType.TUPLE); + DataBag b = BagFactory.getInstance().newDefaultBag(); for (int i=0; i<cardinality; i++){ Tuple t = getRandomTuple(nestingLevel+1); b.add(t); @@ -168,14 +164,13 @@ DataBag loaded = load.load(new BinStorage(), pigContext); System.out.println("Done."); - assertTrue(bag.cardinality() == loaded.cardinality()); + assertTrue(bag.size() == loaded.size()); - Iterator<Datum> it1 = bag.content(); - Iterator<Datum> it2 = loaded.content(); - //while (it1.hasNext() && it2.hasNext()) { - for (int i = 0; it1.hasNext() && it2.hasNext(); i++) { - Tuple f1 = (Tuple)it1.next(); - Tuple f2 = (Tuple)it2.next(); + Iterator<Tuple> it1 = bag.iterator(); + Iterator<Tuple> it2 = loaded.iterator(); + while (it1.hasNext() && it2.hasNext()) { + Tuple f1 = it1.next(); + Tuple f2 = it2.next(); assertTrue(f1.equals(f2)); } assertFalse(it1.hasNext() || it2.hasNext()); Modified: incubator/pig/branches/types/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/Util.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/Util.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/Util.java Tue Jan 22 13:17:12 2008 @@ -19,8 +19,7 @@ import java.io.IOException; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.Tuple; +import org.apache.pig.data.*; import org.apache.pig.data.Datum; import org.apache.pig.data.DataAtom; @@ -29,7 +28,7 @@ // ================= static public Tuple loadFlatTuple(Tuple t, int[] input) throws IOException { for (int i = 0; i < input.length; i++) { - t.setField(i, new DataAtom(new Integer(input[i]).toString())); + t.setField(i, input[i]); } return t; } @@ -42,7 +41,7 @@ } static public Tuple loadNestTuple(Tuple t, int[] input) throws IOException { - DataBag bag = new DataBag(Datum.DataType.TUPLE); + DataBag bag = BagFactory.getInstance().newDefaultBag(); for(int i = 0; i < input.length; i++) { Tuple f = new Tuple(1); f.setField(0, input[i]); @@ -54,7 +53,7 @@ static public Tuple loadNestTuple(Tuple t, int[][] input) throws IOException { for (int i = 0; i < input.length; i++) { - DataBag bag = new DataBag(Datum.DataType.TUPLE); + DataBag bag = BagFactory.getInstance().newDefaultBag(); Tuple f = loadFlatTuple(new Tuple(input[i].length), input[i]); bag.add(f); t.setField(i, bag); @@ -64,7 +63,7 @@ static public Tuple loadTuple(Tuple t, String[][] input) throws IOException { for (int i = 0; i < input.length; i++) { - DataBag bag = new DataBag(Datum.DataType.TUPLE); + DataBag bag = BagFactory.getInstance().newDefaultBag(); Tuple f = loadTuple(new Tuple(input[i].length), input[i]); bag.add(f); t.setField(i, bag);