Author: thejas
Date: Tue Sep 14 17:59:26 2010
New Revision: 997020

URL: http://svn.apache.org/viewvc?rev=997020&view=rev
Log:
PIG-1589: add test cases for mapreduce operator which use distributed cache

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/TestWordCount.jar

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=997020&r1=997019&r2=997020&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Sep 14 17:59:26 2010
@@ -36,6 +36,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1589: add test cases for mapreduce operator which use distributed cache 
(thejas)
+
 PIG-1548: Optimize scalar to consolidate the part file (rding)
 
 PIG-1600: Docs update (chandec via olgan)

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=997020&r1=997019&r2=997020&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Tue Sep 
14 17:59:26 2010
@@ -21,18 +21,17 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -53,7 +52,12 @@ public class TestNativeMapReduce  {
     // 
http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/WordCount.java:816822
     private String jarFileName = 
"test//org/apache/pig/test/data/TestWordCount.jar";
     private String exp_msg_prefix = "Check if expected results contains: ";
-    final static String INPUT_FILE = "TestMapReduceInputFile";
+    final static String INPUT_FILE = "TestNMapReduceInputFile";
+    /**
+     *stop word file - used to test distributed cache usage, words in this
+     * file if specified will be skipped by the wordcount udf
+     */
+    final static String STOPWORD_FILE = "TestNMapReduceStopwFile";
     static MiniCluster cluster = MiniCluster.buildCluster();
     private PigServer pigServer = null;
     
@@ -72,19 +76,38 @@ public class TestNativeMapReduce  {
                 "two",
                 "three"
         };
+        //for stop word file
+        String[] stopw = {
+                "one"
+        };
+   
         Util.createInputFile(cluster, INPUT_FILE, input);
+        Util.createLocalInputFile(STOPWORD_FILE, stopw);
     }
 
+    //  createWordCountJar(){
+    //  // its a manual process 
+    //  javac -cp 
build/ivy/lib/Pig/hadoop-core-0.20.2.jar:build/ivy/lib/Pig/commons-cli-1.2.jar 
test/org/apache/pig/test/utils/WordCount.java 
+    //  cd test/
+    //  jar -cf WordCount.jar org/apache/pig/test/utils/WordCount*class
+    //  mv WordCount.jar org/apache/pig/test/data/TestWordCount.jar
+    //
+    //  
+    //}
+
     @Before
     public void setUp() throws Exception{
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
         //createWordCountJar();
     }
+    
+
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         Util.deleteFile(cluster, INPUT_FILE);
+        new File(STOPWORD_FILE).delete();
         cluster.shutDown();
     }
       
@@ -94,16 +117,20 @@ public class TestNativeMapReduce  {
     public void testNativeMRJobSimple() throws Exception{
         try{
             Collection<String> results = new HashSet<String>();
-            results.add("(one,1)");
             results.add("(two,2)");
             results.add("(three,3)");
 
             pigServer.setBatchOn();
             pigServer.registerQuery("A = load '" + INPUT_FILE + "';");
+
+
+            //also test distributed cache using the stopwords file for udf
             pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
                     "Store A into 'table_testNativeMRJobSimple_input' "+
                     "Load 'table_testNativeMRJobSimple_output' "+
-            "`WordCount table_testNativeMRJobSimple_input 
table_testNativeMRJobSimple_output`;");
+            "`org.apache.pig.test.utils.WordCount -files " + STOPWORD_FILE +
+            " table_testNativeMRJobSimple_input 
table_testNativeMRJobSimple_output " +
+            STOPWORD_FILE + "`;");
             pigServer.registerQuery("Store B into 
'table_testNativeMRJobSimpleDir';");
             List<ExecJob> execJobs = pigServer.executeBatch();
 
@@ -131,10 +158,6 @@ public class TestNativeMapReduce  {
             t = iter.next();
             assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-            assertTrue("iter.hasNext()",iter.hasNext());
-            t = iter.next();
-            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
             assertFalse(iter.hasNext());
 
             // We have to manually delete intermediate mapreduce files
@@ -152,10 +175,6 @@ public class TestNativeMapReduce  {
             t = iter.next();
             assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
 
-            assertTrue("iter.hasNext()",iter.hasNext());
-            t = iter.next();
-            assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
-
             assertFalse(iter.hasNext());
         }
         finally{
@@ -185,7 +204,7 @@ public class TestNativeMapReduce  {
             pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
                     "Store A into 'table_testNativeMRJobSimple_input' "+
                     "Load 'table_testNativeMRJobSimple_output' "+
-            "`WordCount table_testNativeMRJobSimple_input " + INPUT_FILE + 
"`;");
+            "`org.apache.pig.test.utils.WordCount 
table_testNativeMRJobSimple_input " + INPUT_FILE + "`;");
             pigServer.registerQuery("Store B into 
'table_testNativeMRJobSimpleDir';");
 //            List<ExecJob> execJobs = pigServer.executeBatch();
 
@@ -217,7 +236,7 @@ public class TestNativeMapReduce  {
             pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
                     "Store A into 
'table_testNativeMRJobMultiStoreOnPred_input' "+
                     "Load 'table_testNativeMRJobMultiStoreOnPred_output' "+
-            "`WordCount table_testNativeMRJobMultiStoreOnPred_input 
table_testNativeMRJobMultiStoreOnPred_output`;");
+            "`org.apache.pig.test.utils.WordCount 
table_testNativeMRJobMultiStoreOnPred_input 
table_testNativeMRJobMultiStoreOnPred_output`;");
             pigServer.registerQuery("Store B into 
'table_testNativeMRJobMultiStoreOnPredDir';");
             pigServer.executeBatch();
 
@@ -282,11 +301,11 @@ public class TestNativeMapReduce  {
             pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
                     "Store A into 'table_testNativeMRJobMultiQueryOpt_inputB' 
"+
                     "Load 'table_testNativeMRJobMultiQueryOpt_outputB' "+
-            "`WordCount table_testNativeMRJobMultiQueryOpt_inputB 
table_testNativeMRJobMultiQueryOpt_outputB`;");
+            "`org.apache.pig.test.utils.WordCount 
table_testNativeMRJobMultiQueryOpt_inputB 
table_testNativeMRJobMultiQueryOpt_outputB`;");
             pigServer.registerQuery("C = mapreduce '" + jarFileName + "' " +
                     "Store A into 'table_testNativeMRJobMultiQueryOpt_inputC' 
"+
                     "Load 'table_testNativeMRJobMultiQueryOpt_outputC' "+
-            "`WordCount table_testNativeMRJobMultiQueryOpt_inputC 
table_testNativeMRJobMultiQueryOpt_outputC`;");
+            "`org.apache.pig.test.utils.WordCount 
table_testNativeMRJobMultiQueryOpt_inputC 
table_testNativeMRJobMultiQueryOpt_outputC`;");
 
             Iterator<Tuple> iter = pigServer.openIterator("C");
             Tuple t;
@@ -341,7 +360,7 @@ public class TestNativeMapReduce  {
             pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " +
                     "Store A into 
'table_testNativeMRJobTypeCastInserter_input' "+
                     "Load 'table_testNativeMRJobTypeCastInserter_output' as 
(name:chararray, count: int)"+
-            "`WordCount table_testNativeMRJobTypeCastInserter_input 
table_testNativeMRJobTypeCastInserter_output`;");
+            "`org.apache.pig.test.utils.WordCount 
table_testNativeMRJobTypeCastInserter_input 
table_testNativeMRJobTypeCastInserter_output`;");
             pigServer.registerQuery("C = foreach B generate count+1;");
 
             Iterator<Tuple> iter = pigServer.openIterator("C");

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/TestWordCount.jar
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/TestWordCount.jar?rev=997020&r1=997019&r2=997020&view=diff
==============================================================================
Binary files - no diff available.

Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java?rev=997020&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java Tue Sep 14 
17:59:26 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class WordCount {
+    final static String STOP_WORDS_FILE = "wc.stopwords.file";
+    public static class TokenizerMapper 
+    extends Mapper<Object, Text, Text, IntWritable>{
+        Set<String> stopWords = new HashSet<String>();
+        
+        /* (non-Javadoc)
+         * @see 
org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+         * load stop words file into stopWords set
+         */
+        @Override
+        protected void setup(Mapper.Context context){
+
+            Configuration conf = context.getConfiguration();
+            String stp_file_name = conf.get(STOP_WORDS_FILE);
+            if(stp_file_name == null)
+                return;
+            File stp_file = new File(stp_file_name);
+            BufferedReader fis;
+            try {
+                fis = new BufferedReader(new FileReader(stp_file));
+            } catch (FileNotFoundException e) {
+                e.printStackTrace();
+                throw new RuntimeException("Could not open stopwords file ",e);
+            }
+            String word;
+            try {
+                while((word =fis.readLine()) != null){
+                    stopWords.add(word);
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+                throw new RuntimeException("error while reading stopwords",e);
+            }
+        }
+
+        
+        private final static IntWritable one = new IntWritable(1);
+        private Text word = new Text();
+
+        public void map(Object key, Text value, Context context
+        ) throws IOException, InterruptedException {
+            StringTokenizer itr = new StringTokenizer(value.toString());
+            while (itr.hasMoreTokens()) {
+                word.set(itr.nextToken());
+                if(stopWords.contains(word.toString())){
+                    continue;
+                }
+                context.write(word, one);
+            }
+        }
+    }
+
+    public static class IntSumReducer 
+    extends Reducer<Text,IntWritable,Text,IntWritable> {
+        private IntWritable result = new IntWritable();
+        public void reduce(Text key, Iterable<IntWritable> values, 
+                Context context
+        ) throws IOException, InterruptedException {
+            int sum = 0;
+            for (IntWritable val : values) {
+                sum += val.get();
+            }
+            result.set(sum);
+            context.write(key, result);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Configuration conf = new Configuration();
+        String[] otherArgs = new GenericOptionsParser(conf, 
args).getRemainingArgs();
+        if (otherArgs.length < 2) {
+            System.err.println("Usage: wordcount <in> <out> [wordcount stop 
word file]");
+            System.exit(2);
+        }
+        Job job = new Job(conf, "word count");
+        job.setJarByClass(WordCount.class);
+        job.setMapperClass(TokenizerMapper.class);
+        job.setCombinerClass(IntSumReducer.class);
+        job.setReducerClass(IntSumReducer.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+        if(otherArgs.length > 2){
+            job.getConfiguration().set(STOP_WORDS_FILE, otherArgs[2]);
+        }
+        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+}


Reply via email to