Author: thejas
Date: Tue Sep 14 17:59:58 2010
New Revision: 997021

URL: http://svn.apache.org/viewvc?rev=997021&view=rev
Log:
PIG-1589: add test cases for mapreduce operator which use distributed cache - 
adding missing file

Added:
    hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/utils/WordCount.java

Added: 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/utils/WordCount.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/utils/WordCount.java?rev=997021&view=auto
==============================================================================
--- 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/utils/WordCount.java 
(added)
+++ 
hadoop/pig/branches/branch-0.8/test/org/apache/pig/test/utils/WordCount.java 
Tue Sep 14 17:59:58 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class WordCount {
+    final static String STOP_WORDS_FILE = "wc.stopwords.file";
+    public static class TokenizerMapper 
+    extends Mapper<Object, Text, Text, IntWritable>{
+        Set<String> stopWords = new HashSet<String>();
+        
+        /* (non-Javadoc)
+         * @see 
org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
+         * load stop words file into stopWords set
+         */
+        @Override
+        protected void setup(Mapper.Context context){
+
+            Configuration conf = context.getConfiguration();
+            String stp_file_name = conf.get(STOP_WORDS_FILE);
+            if(stp_file_name == null)
+                return;
+            File stp_file = new File(stp_file_name);
+            BufferedReader fis;
+            try {
+                fis = new BufferedReader(new FileReader(stp_file));
+            } catch (FileNotFoundException e) {
+                e.printStackTrace();
+                throw new RuntimeException("Could not open stopwords file ",e);
+            }
+            String word;
+            try {
+                while((word =fis.readLine()) != null){
+                    stopWords.add(word);
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+                throw new RuntimeException("error while reading stopwords",e);
+            }
+        }
+
+        
+        private final static IntWritable one = new IntWritable(1);
+        private Text word = new Text();
+
+        public void map(Object key, Text value, Context context
+        ) throws IOException, InterruptedException {
+            StringTokenizer itr = new StringTokenizer(value.toString());
+            while (itr.hasMoreTokens()) {
+                word.set(itr.nextToken());
+                if(stopWords.contains(word.toString())){
+                    continue;
+                }
+                context.write(word, one);
+            }
+        }
+    }
+
+    public static class IntSumReducer 
+    extends Reducer<Text,IntWritable,Text,IntWritable> {
+        private IntWritable result = new IntWritable();
+        public void reduce(Text key, Iterable<IntWritable> values, 
+                Context context
+        ) throws IOException, InterruptedException {
+            int sum = 0;
+            for (IntWritable val : values) {
+                sum += val.get();
+            }
+            result.set(sum);
+            context.write(key, result);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Configuration conf = new Configuration();
+        String[] otherArgs = new GenericOptionsParser(conf, 
args).getRemainingArgs();
+        if (otherArgs.length < 2) {
+            System.err.println("Usage: wordcount <in> <out> [wordcount stop 
word file]");
+            System.exit(2);
+        }
+        Job job = new Job(conf, "word count");
+        job.setJarByClass(WordCount.class);
+        job.setMapperClass(TokenizerMapper.class);
+        job.setCombinerClass(IntSumReducer.class);
+        job.setReducerClass(IntSumReducer.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
+        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
+        if(otherArgs.length > 2){
+            job.getConfiguration().set(STOP_WORDS_FILE, otherArgs[2]);
+        }
+        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+}


Reply via email to