Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java 
Tue Jan 22 13:17:12 2008
@@ -19,7 +19,6 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
@@ -73,7 +72,6 @@
         assertFalse(da1.compareTo(da2) > 0);
     }
 
-       /* Replaced by TestTuple.java
     @Test
     public void testTuple() throws Exception {
         int arity = 5;
@@ -160,9 +158,8 @@
         n1.appendTuple(n2);
         assertTrue(n1.arity() == n1Arity + n2Arity);
     }
-       */
 
-       /* Replaced by TestDataBag.java 
+    /*
     @Test
     public void testDataBag() throws Exception {
         int[] input1 = { 1, 2, 3, 4, 5 };
@@ -197,8 +194,19 @@
             caught = true;
         }   
         assertTrue(caught);
+
+        // check that notifications are sent
+         b.clear();
+         DataBag.notifyInterval = 2;
+         Tuple g = Util.loadFlatTuple(new Tuple(input1.length), input1);
+         for (int i = 0; i < 10; i++) {
+             b.add(g);
+         }
+
+         Iterator it = b.content();
+         while (it.hasNext()) it.next();
+         assert(b.numNotifies == 5);
     }
-       */
 
     @Test
     
@@ -208,198 +216,81 @@
 
     public void testBigDataBagOnDisk() throws Exception{
        Runtime.getRuntime().gc();
-       //testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 10000);
+       testBigDataBag(Runtime.getRuntime().maxMemory() - 1*1024*1024, 1000000);
     }
+    */
 
+    private enum TestType {
+       PRE_SORT,
+       POST_SORT,
+       PRE_DISTINCT,
+       POST_DISTINCT,
+       NONE
+    }
+       
     
+    /*
     private void testBigDataBag(long freeMemoryToMaintain, int numItems) 
throws Exception {
        BigDataBag.FREE_MEMORY_TO_MAINTAIN = freeMemoryToMaintain;
-       File tmp = File.createTempFile("test", "bag").getParentFile();
-        BigDataBag bag = new BigDataBag(Datum.DataType.TUPLE, tmp);
-        Iterator<Datum> it;
-        int count;
-        //String last;
-               Tuple lastT = null;
-    
         Random r = new Random();
-        
-        
-        //Basic test
-        assertTrue(bag.isEmpty());
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(2);
-            t.setField(0, Integer.toHexString(i));
-            t.setField(1, i);
-            bag.add(t);
-        }
-        
-        assertFalse(bag.isEmpty());
-
-        assertTrue(bag.cardinality() == numItems);
-        
-        int lastI = -1;
-        it = bag.content();
-        count = 0;
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-            int ix = Integer.parseInt(t.getAtomField(0).strval(), 16);
-            
assertTrue(Integer.toString(ix).equals(t.getAtomField(1).strval()));
-            assertEquals(lastI+1, ix);
-            lastI = ix;
-            count++;
-        }
-        
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-
-        //Test pre sort
-        
-        bag.sort();
-        
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
-        
-        it = bag.content();
-        count = 0;
-        // last= "";
-               lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-                       /*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-                       */
-                       assertTrue("last should be <= next", lastT.compareTo(t) 
<= 0);
-                       lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-
-
-        //Test post sort
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            t.setField(0, r.nextInt(100000));
-            bag.add(t);
-        }
-        
-        bag.sort();
+   
+       for (TestType testType: TestType.values()){
+               BigDataBag bag = BagFactory.getInstance().getNewBigBag();
+
+            assertTrue(bag.isEmpty());
+
+            if (testType == TestType.PRE_SORT)
+               bag.sort();
+            else if (testType == TestType.PRE_DISTINCT)
+               bag.distinct();
+            
+            //generate data and add it to the bag
+            for(int i = 0; i < numItems; i++) {
+                Tuple t = new Tuple(1);
+                t.setField(0, r.nextInt(numItems));
+                bag.add(t);
+            }
+
+            assertFalse(bag.isEmpty());
+
+            if (testType == TestType.POST_SORT)
+               bag.sort();
+            else if (testType == TestType.POST_DISTINCT)
+               bag.distinct();
+
+            
+            if (testType == TestType.NONE)
+               assertTrue(bag.cardinality() == numItems);
+            checkContents(bag, numItems, testType);
+            checkContents(bag, numItems, testType);
 
-        it = bag.content();
-        count = 0;
-        //last= "";
-               lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-                       /*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<=0);
-            last = next;
-                       */
-                       assertTrue("last should be <= next", lastT.compareTo(t) 
<= 0);
-                       lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-               
-        //test post-distinct
-        
-       
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(1000));
-            bag.add(t);
-        }
+       }
+    }
+     
+    
+    private void checkContents(DataBag bag, int numItems, TestType testType) 
throws Exception{
+        String last = "";
         
+        DataBag.notifyInterval = 100;
         
-        bag.distinct();
-
-        it = bag.content();
-        count = 0;
-        //last= "";
-               lastT = new Tuple();
+        Iterator<Tuple> it = bag.content();
+        int count = 0;
         while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-                       /*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
+               Tuple t = it.next();
+               String next = t.getAtomField(0).strval();
+               if (testType == TestType.POST_SORT || testType == 
TestType.PRE_SORT)
+                assertTrue(last.compareTo(next)<=0);
+               else if (testType == TestType.POST_DISTINCT || testType == 
TestType.PRE_DISTINCT)
+                assertTrue(last.compareTo(next)<0);
             last = next;
-                       */
-                       assertTrue("last should be <= next", lastT.compareTo(t) 
<= 0);
-                       lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-        
-        bag.clear();
-               
-        
-        //Test pre distinct
-
-        bag.distinct();
-
-        
-        for(int i = 0; i < numItems; i++) {
-            Tuple t = new Tuple(1);
-            //To get a lot of duplicates
-            t.setField(0, r.nextInt(10));
-            bag.add(t);
+               count++;
         }
         
-
-        it = bag.content();
-        count = 0;
-        //last= "";
-               lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-                       /*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-                       */
-                       assertTrue("last should be <= next", lastT.compareTo(t) 
<= 0);
-                       lastT = t;
-            count++;
-        }
-
-        assertTrue(bag.cardinality() == count);
-
-        //Check if it gives the correct contents the second time around
-        it = bag.content();
-        count = 0;
-        //last= "";
-               lastT = new Tuple();
-        while(it.hasNext()) {
-            Tuple t = (Tuple)it.next();
-                       /*
-            String next = t.getAtomField(0).strval();
-            assertTrue(last.compareTo(next)<0);
-            last = next;
-                       */
-                       assertTrue("last should be <= next", lastT.compareTo(t) 
<= 0);
-                       lastT = t;
-            count++;
-        }
-
         assertTrue(bag.cardinality() == count);
         
-        bag.clear();
+        if (testType != TestType.NONE)
+               assertTrue(bag.numNotifies >= count/DataBag.notifyInterval);
     }
+    */
+
 }

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java 
Tue Jan 22 13:17:12 2008
@@ -15,50 +15,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.StringTokenizer;
-
-import org.junit.Test;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigServer;
-import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.TextLoader;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.PigFile;
-
-import junit.framework.TestCase;
-
-public class TestEvalPipeline extends TestCase {
-       
-       String initString = "mapreduce";
-       
-       
-       static public class MyBagFunction extends EvalFunc<DataBag>{
-               @Override
-               public void exec(Tuple input, DataBag output) throws 
IOException {
-                       output.add(new Tuple("a"));
-                       output.add(new Tuple("a"));
-                       output.add(new Tuple("a"));
-                       
-               }
-       }
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.junit.Test;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.TextLoader;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.PigFile;
+
+import junit.framework.TestCase;
+
+public class TestEvalPipeline extends TestCase {
+       
+       String initString = "mapreduce";
+       
+       
+       static public class MyBagFunction extends EvalFunc<DataBag>{
+               @Override
+               public void exec(Tuple input, DataBag output) throws 
IOException {
+                       output.add(new Tuple("a"));
+                       output.add(new Tuple("a"));
+                       output.add(new Tuple("a"));
+                       
+               }
+       }
+       
        
        private File createFile(String[] data) throws Exception{
                File f = File.createTempFile("tmp", "");
@@ -84,232 +83,230 @@
                        assertEquals(iter.next().getAtomField(0).numval(), 0.0);
                }
                
-       }
-       
-       @Test
-       public void testJoin() throws Exception{
-               PigServer pigServer = new PigServer(initString);
-               
-               File f1 = createFile(new String[]{"a:1","b:1","a:1"});
-               File f2 = createFile(new String[]{"b","b","a"});
-               
-               pigServer.registerQuery("a = load 'file:" + f1 + "' using " + 
PigStorage.class.getName() + "(':');");
-               pigServer.registerQuery("b = load 'file:" + f2 + "';");
-               pigServer.registerQuery("c = cogroup a by $0, b by $0;");       
        
-               pigServer.registerQuery("d = foreach c generate 
flatten($1),flatten($2);");
-               
-               Iterator<Tuple> iter = pigServer.openIterator("d");
-               int count = 0;
-               while(iter.hasNext()){
-                       Tuple t = iter.next();
-                       
assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval()));
-                       count++;
-               }
-               assertEquals(count, 4);
-       }
-       
-       @Test
-       public void testDriverMethod() throws Exception{
-               PigServer pigServer = new PigServer(initString);
-               File f = File.createTempFile("tmp", "");
-               PrintWriter pw = new PrintWriter(f);
-               pw.println("a");
-               pw.println("a");
-               pw.close();
-               pigServer.registerQuery("a = foreach (load 'file:" + f + "') 
generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
-               pigServer.registerQuery("b = foreach a generate $0, 
flatten($1);");
-               Iterator<Tuple> iter = pigServer.openIterator("a");
-               int count = 0;
-               while(iter.hasNext()){
-                       Tuple t = iter.next();
-                       assertTrue(t.getAtomField(0).strval().equals("1"));
-                       assertTrue(t.getAtomField(1).strval().equals("a"));
-                       count++;
-               }
-               assertEquals(count, 6);
-               f.delete();
-       }
-       
-       
-       /* Replaced by TestDataMap.java
-       @Test
-       public void testMapLookup() throws IOException{
-               PigServer pigServer = new PigServer(initString);
-               DataBag b = new DataBag();
-               DataMap colors = new DataMap();
-               colors.put("apple","red");
-               colors.put("orange","orange");
-               
-               DataMap weights = new DataMap();
-               weights.put("apple","0.1");
-               weights.put("orange","0.3");
-               
-               Tuple t = new Tuple();
-               t.appendField(colors);
-               t.appendField(weights);
-               b.add(t);
-               
-               String fileName = "file:"+File.createTempFile("tmp", "");
-               PigFile f = new PigFile(fileName);
-               f.store(b, new BinStorage(), pigServer.getPigContext());
-               
-               
-               pigServer.registerQuery("a = load '" + fileName + "' using 
BinStorage();");
-               pigServer.registerQuery("b = foreach a generate 
$0#'apple',flatten($1#'orange');");
-               Iterator<Tuple> iter = pigServer.openIterator("b");
-               t = iter.next();
-               assertEquals(t.getAtomField(0).strval(), "red");
-               assertEquals(t.getAtomField(1).numval(), 0.3);
-               assertFalse(iter.hasNext());
-       }
-       */
-       
-       
-       static public class TitleNGrams extends EvalFunc<DataBag> {
-               
-               @Override
-               public void exec(Tuple input, DataBag output) throws 
IOException {      
-                   String str = input.getAtomField(0).strval();
-                       
-                       String title = str;
-
-                       if (title != null) {
-                               List<String> nGrams = makeNGrams(title);
-                               
-                               for (Iterator<String> it = nGrams.iterator(); 
it.hasNext(); ) {
-                                       Tuple t = new Tuple(1);
-                                       t.setField(0, it.next());
-                                       output.add(t);
-                               }
-                       }
-           }
-               
-               
-               List<String> makeNGrams(String str) {
-                       List<String> tokens = new ArrayList<String>();
-                       
-                       StringTokenizer st = new StringTokenizer(str);
-                       while (st.hasMoreTokens())
-                               tokens.add(st.nextToken());
-                       
-                       return nGramHelper(tokens, new ArrayList<String>());
-               }
-               
-               ArrayList<String> nGramHelper(List<String> str, 
ArrayList<String> nGrams) {
-                       if (str.size() == 0)
-                               return nGrams;
-                       
-                       for (int i = 0; i < str.size(); i++)
-                               nGrams.add(makeString(str.subList(0, i+1)));
-                       
-                       return nGramHelper(str.subList(1, str.size()), nGrams);
-               }
-               
-               String makeString(List<String> list) {
-                       StringBuffer sb = new StringBuffer();
-                       for (Iterator<String> it = list.iterator(); 
it.hasNext(); ) {
-                               sb.append(it.next());
-                               if (it.hasNext())
-                                       sb.append(" ");
-                       }
-                       return sb.toString();
-               }
-       }
-
-       
-       
-       
-       @Test
-       public void testBagFunctionWithFlattening() throws Exception{
-               PigServer pigServer = new PigServer(initString);
-               File queryLogFile = createFile(
-                                       new String[]{ 
-                                               "stanford\tdeer\tsighting",
-                                               "bush\tpresident",
-                                               "stanford\tbush",
-                                               "conference\tyahoo",
-                                               "world\tcup\tcricket",
-                                               "bush\twins",
-                                               "stanford\tpresident",
-                                       }
-                               );
-                               
-               File newsFile = createFile(
-                                       new String[]{
-                                               "deer seen at stanford",
-                                               "george bush visits stanford", 
-                                               "yahoo hosting a conference in 
the bay area", 
-                                               "who will win the world cup"
-                                       }
-                               );      
-               
-               Map<String, Integer> expectedResults = new HashMap<String, 
Integer>();
-               expectedResults.put("bush", 2);
-               expectedResults.put("stanford", 3);
-               expectedResults.put("world", 1);
-               expectedResults.put("conference", 1);
-               
-               pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile 
+ "' USING " + TextLoader.class.getName() + "();");
-           pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + 
"';");
-
-           pigServer.registerQuery("titleNGrams = FOREACH newsArticles 
GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
-           pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 
INNER, queryLog BY $0 INNER;");
-           pigServer.registerQuery("answer = FOREACH cogrouped GENERATE 
COUNT(queryLog),group;");
-               
-           Iterator<Tuple> iter = pigServer.openIterator("answer");
-           while(iter.hasNext()){
-               Tuple t = iter.next();
-               
assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue());
-           }
-       }
-       
-
-       
-       @Test
-       public void testSort() throws Exception{
-               testSortDistinct(false);
-       }
-       
-
-       @Test
-       public void testDistinct() throws Exception{
-               testSortDistinct(true);
-       }
-
-       private void testSortDistinct(boolean eliminateDuplicates) throws 
Exception{
-               int LOOP_SIZE = 1024*16;
-               File tmpFile = File.createTempFile("test", "txt");
-        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
-        Random r = new Random();
-        for(int i = 0; i < LOOP_SIZE; i++) {
-            ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
-        }
-        ps.close(); 
-               
-               PigServer pig = new PigServer(initString);
-        String tmpOutputFile = FileLocalizer.getTemporaryPath(null, 
pig.getPigContext()).toString();
-               pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
-               if (eliminateDuplicates){
-                       pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) 
PARALLEL 10;");
-               }else{
-                       pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
-               }
-               pig.store("B", tmpOutputFile);
-               
-               pig.registerQuery("A = load '" + tmpOutputFile + "';");
-               Iterator<Tuple> iter = pig.openIterator("A");
-               int last = -1;
-               while (iter.hasNext()){
-                       Tuple t = iter.next();
-                       if (eliminateDuplicates){
-                               assertTrue(last < 
t.getAtomField(0).numval().intValue());
-                       }else{
-                               assertTrue(last <= 
t.getAtomField(0).numval().intValue());
-                               assertEquals(t.arity(), 2);
-                       }
-               }
-               
-       }
-       
-
-}
+       }
+       
+       @Test
+       public void testJoin() throws Exception{
+               PigServer pigServer = new PigServer(initString);
+               
+               File f1 = createFile(new String[]{"a:1","b:1","a:1"});
+               File f2 = createFile(new String[]{"b","b","a"});
+               
+               pigServer.registerQuery("a = load 'file:" + f1 + "' using " + 
PigStorage.class.getName() + "(':');");
+               pigServer.registerQuery("b = load 'file:" + f2 + "';");
+               pigServer.registerQuery("c = cogroup a by $0, b by $0;");       
        
+               pigServer.registerQuery("d = foreach c generate 
flatten($1),flatten($2);");
+               
+               Iterator<Tuple> iter = pigServer.openIterator("d");
+               int count = 0;
+               while(iter.hasNext()){
+                       Tuple t = iter.next();
+                       
assertTrue(t.getAtomField(0).strval().equals(t.getAtomField(2).strval()));
+                       count++;
+               }
+               assertEquals(count, 4);
+       }
+       
+       @Test
+       public void testDriverMethod() throws Exception{
+               PigServer pigServer = new PigServer(initString);
+               File f = File.createTempFile("tmp", "");
+               PrintWriter pw = new PrintWriter(f);
+               pw.println("a");
+               pw.println("a");
+               pw.close();
+               pigServer.registerQuery("a = foreach (load 'file:" + f + "') 
generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
+               pigServer.registerQuery("b = foreach a generate $0, 
flatten($1);");
+               Iterator<Tuple> iter = pigServer.openIterator("a");
+               int count = 0;
+               while(iter.hasNext()){
+                       Tuple t = iter.next();
+                       assertTrue(t.getAtomField(0).strval().equals("1"));
+                       assertTrue(t.getAtomField(1).strval().equals("a"));
+                       count++;
+               }
+               assertEquals(count, 6);
+               f.delete();
+       }
+       
+       
+       @Test
+       public void testMapLookup() throws IOException{
+               PigServer pigServer = new PigServer(initString);
+               DataBag b = BagFactory.getInstance().newDefaultBag();
+               DataMap colors = new DataMap();
+               colors.put("apple","red");
+               colors.put("orange","orange");
+               
+               DataMap weights = new DataMap();
+               weights.put("apple","0.1");
+               weights.put("orange","0.3");
+               
+               Tuple t = new Tuple();
+               t.appendField(colors);
+               t.appendField(weights);
+               b.add(t);
+               
+               String fileName = "file:"+File.createTempFile("tmp", "");
+               PigFile f = new PigFile(fileName);
+               f.store(b, new BinStorage(), pigServer.getPigContext());
+               
+               
+               pigServer.registerQuery("a = load '" + fileName + "' using 
BinStorage();");
+               pigServer.registerQuery("b = foreach a generate 
$0#'apple',flatten($1#'orange');");
+               Iterator<Tuple> iter = pigServer.openIterator("b");
+               t = iter.next();
+               assertEquals(t.getAtomField(0).strval(), "red");
+               assertEquals(t.getAtomField(1).numval(), 0.3);
+               assertFalse(iter.hasNext());
+       }
+       
+       
+       static public class TitleNGrams extends EvalFunc<DataBag> {
+               
+               @Override
+               public void exec(Tuple input, DataBag output) throws 
IOException {      
+                   String str = input.getAtomField(0).strval();
+                       
+                       String title = str;
+
+                       if (title != null) {
+                               List<String> nGrams = makeNGrams(title);
+                               
+                               for (Iterator<String> it = nGrams.iterator(); 
it.hasNext(); ) {
+                                       Tuple t = new Tuple(1);
+                                       t.setField(0, it.next());
+                                       output.add(t);
+                               }
+                       }
+           }
+               
+               
+               List<String> makeNGrams(String str) {
+                       List<String> tokens = new ArrayList<String>();
+                       
+                       StringTokenizer st = new StringTokenizer(str);
+                       while (st.hasMoreTokens())
+                               tokens.add(st.nextToken());
+                       
+                       return nGramHelper(tokens, new ArrayList<String>());
+               }
+               
+               ArrayList<String> nGramHelper(List<String> str, 
ArrayList<String> nGrams) {
+                       if (str.size() == 0)
+                               return nGrams;
+                       
+                       for (int i = 0; i < str.size(); i++)
+                               nGrams.add(makeString(str.subList(0, i+1)));
+                       
+                       return nGramHelper(str.subList(1, str.size()), nGrams);
+               }
+               
+               String makeString(List<String> list) {
+                       StringBuffer sb = new StringBuffer();
+                       for (Iterator<String> it = list.iterator(); 
it.hasNext(); ) {
+                               sb.append(it.next());
+                               if (it.hasNext())
+                                       sb.append(" ");
+                       }
+                       return sb.toString();
+               }
+       }
+
+       
+       
+       
+       @Test
+       public void testBagFunctionWithFlattening() throws Exception{
+               PigServer pigServer = new PigServer(initString);
+               File queryLogFile = createFile(
+                                       new String[]{ 
+                                               "stanford\tdeer\tsighting",
+                                               "bush\tpresident",
+                                               "stanford\tbush",
+                                               "conference\tyahoo",
+                                               "world\tcup\tcricket",
+                                               "bush\twins",
+                                               "stanford\tpresident",
+                                       }
+                               );
+                               
+               File newsFile = createFile(
+                                       new String[]{
+                                               "deer seen at stanford",
+                                               "george bush visits stanford", 
+                                               "yahoo hosting a conference in 
the bay area", 
+                                               "who will win the world cup"
+                                       }
+                               );      
+               
+               Map<String, Integer> expectedResults = new HashMap<String, 
Integer>();
+               expectedResults.put("bush", 2);
+               expectedResults.put("stanford", 3);
+               expectedResults.put("world", 1);
+               expectedResults.put("conference", 1);
+               
+               pigServer.registerQuery("newsArticles = LOAD 'file:" + newsFile 
+ "' USING " + TextLoader.class.getName() + "();");
+           pigServer.registerQuery("queryLog = LOAD 'file:" + queryLogFile + 
"';");
+
+           pigServer.registerQuery("titleNGrams = FOREACH newsArticles 
GENERATE flatten(" + TitleNGrams.class.getName() + "(*));");
+           pigServer.registerQuery("cogrouped = COGROUP titleNGrams BY $0 
INNER, queryLog BY $0 INNER;");
+           pigServer.registerQuery("answer = FOREACH cogrouped GENERATE 
COUNT(queryLog),group;");
+               
+           Iterator<Tuple> iter = pigServer.openIterator("answer");
+           while(iter.hasNext()){
+               Tuple t = iter.next();
+               
assertEquals(expectedResults.get(t.getAtomField(1).strval()).doubleValue(),t.getAtomField(0).numval().doubleValue());
+           }
+       }
+       
+
+       
+       @Test
+       public void testSort() throws Exception{
+               testSortDistinct(false);
+       }
+       
+
+       @Test
+       public void testDistinct() throws Exception{
+               testSortDistinct(true);
+       }
+
+       private void testSortDistinct(boolean eliminateDuplicates) throws 
Exception{
+               int LOOP_SIZE = 1024*16;
+               File tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        Random r = new Random();
+        for(int i = 0; i < LOOP_SIZE; i++) {
+            ps.println(r.nextInt(LOOP_SIZE/2) + "\t" + i);
+        }
+        ps.close(); 
+               
+               PigServer pig = new PigServer(initString);
+        String tmpOutputFile = FileLocalizer.getTemporaryPath(null, 
pig.getPigContext()).toString();
+               pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+               if (eliminateDuplicates){
+                       pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) 
PARALLEL 10;");
+               }else{
+                       pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+               }
+               pig.store("B", tmpOutputFile);
+               
+               pig.registerQuery("A = load '" + tmpOutputFile + "';");
+               Iterator<Tuple> iter = pig.openIterator("A");
+               int last = -1;
+               while (iter.hasNext()){
+                       Tuple t = iter.next();
+                       if (eliminateDuplicates){
+                               assertTrue(last < 
t.getAtomField(0).numval().intValue());
+                       }else{
+                               assertTrue(last <= 
t.getAtomField(0).numval().intValue());
+                               assertEquals(t.arity(), 2);
+                       }
+               }
+               
+       }
+       
+
+}

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java 
Tue Jan 22 13:17:12 2008
@@ -78,9 +78,9 @@
        }
         @Override
                public void exec(Tuple input, DataBag output) throws 
IOException {
-            Iterator<Datum> it = (input.getBagField(0)).content();
+            Iterator<Tuple> it = (input.getBagField(0)).iterator();
             while(it.hasNext()) {
-                Tuple t = (Tuple)it.next();
+                Tuple t = it.next();
                 Tuple newT = new Tuple(2);
                 newT.setField(0, field0);
                 newT.setField(1, t.getField(0).toString());

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java Tue 
Jan 22 13:17:12 2008
@@ -33,18 +33,14 @@
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.PigContext;
 
 public class TestPigFile extends TestCase {
 
-    DataBag bag          = new DataBag(Datum.DataType.TUPLE);
+    DataBag bag          = BagFactory.getInstance().newDefaultBag();
     Random rand = new Random();
     
     @Override
@@ -89,13 +85,13 @@
         DataBag loaded = load.load(new PigStorage(), pigContext);
         System.out.println("Done.");
 
-        assertTrue(bag.cardinality() == loaded.cardinality());
+        assertTrue(bag.size() == loaded.size());
 
-        Iterator<Datum> it1 = bag.content();
-        Iterator<Datum> it2 = loaded.content();
+        Iterator<Tuple> it1 = bag.iterator();
+        Iterator<Tuple> it2 = loaded.iterator();
         while (it1.hasNext() && it2.hasNext()) {
-            Tuple f1 = (Tuple)it1.next();
-            Tuple f2 = (Tuple)it2.next();
+            Tuple f1 = it1.next();
+            Tuple f2 = it2.next();
             assertTrue(f1.equals(f2));
         }
         assertFalse(it1.hasNext() || it2.hasNext());
@@ -131,7 +127,7 @@
     
     private DataBag getRandomBag(int maxCardinality, int nestingLevel) throws 
IOException{
        int cardinality = rand.nextInt(maxCardinality)+1;
-       DataBag b = new DataBag(Datum.DataType.TUPLE);
+       DataBag b = BagFactory.getInstance().newDefaultBag();
        for (int i=0; i<cardinality; i++){
                Tuple t = getRandomTuple(nestingLevel+1); 
                b.add(t);
@@ -168,14 +164,13 @@
         DataBag loaded = load.load(new BinStorage(), pigContext);
         System.out.println("Done.");
 
-        assertTrue(bag.cardinality() == loaded.cardinality());
+        assertTrue(bag.size() == loaded.size());
 
-        Iterator<Datum> it1 = bag.content();
-        Iterator<Datum> it2 = loaded.content();
-        //while (it1.hasNext() && it2.hasNext()) {
-        for (int i = 0; it1.hasNext() && it2.hasNext(); i++) {
-            Tuple f1 = (Tuple)it1.next();
-            Tuple f2 = (Tuple)it2.next();
+        Iterator<Tuple> it1 = bag.iterator();
+        Iterator<Tuple> it2 = loaded.iterator();
+        while (it1.hasNext() && it2.hasNext()) {
+            Tuple f1 = it1.next();
+            Tuple f2 = it2.next();
             assertTrue(f1.equals(f2));
         }
         assertFalse(it1.hasNext() || it2.hasNext());

Modified: incubator/pig/branches/types/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/Util.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/Util.java Tue Jan 22 
13:17:12 2008
@@ -19,8 +19,7 @@
 
 import java.io.IOException;
 
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.data.*;
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.DataAtom;
 
@@ -29,7 +28,7 @@
     // =================
     static public Tuple loadFlatTuple(Tuple t, int[] input) throws IOException 
{
         for (int i = 0; i < input.length; i++) {
-            t.setField(i, new DataAtom(new Integer(input[i]).toString()));
+            t.setField(i, input[i]);
         }
         return t;
     }
@@ -42,7 +41,7 @@
     }
 
     static public Tuple loadNestTuple(Tuple t, int[] input) throws IOException 
{
-        DataBag bag = new DataBag(Datum.DataType.TUPLE);
+        DataBag bag = BagFactory.getInstance().newDefaultBag();
         for(int i = 0; i < input.length; i++) {
             Tuple f = new Tuple(1);
             f.setField(0, input[i]);
@@ -54,7 +53,7 @@
 
     static public Tuple loadNestTuple(Tuple t, int[][] input) throws 
IOException {
         for (int i = 0; i < input.length; i++) {
-            DataBag bag = new DataBag(Datum.DataType.TUPLE);
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
             Tuple f = loadFlatTuple(new Tuple(input[i].length), input[i]);
             bag.add(f);
             t.setField(i, bag);
@@ -64,7 +63,7 @@
 
     static public Tuple loadTuple(Tuple t, String[][] input) throws 
IOException {
         for (int i = 0; i < input.length; i++) {
-            DataBag bag = new DataBag(Datum.DataType.TUPLE);
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
             Tuple f = loadTuple(new Tuple(input[i].length), input[i]);
             bag.add(f);
             t.setField(i, bag);


Reply via email to