Author: thejas Date: Tue Sep 14 17:59:26 2010 New Revision: 997020 URL: http://svn.apache.org/viewvc?rev=997020&view=rev Log: PIG-1589: add test cases for mapreduce operator which use distributed cache
Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java hadoop/pig/trunk/test/org/apache/pig/test/data/TestWordCount.jar Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=997020&r1=997019&r2=997020&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Tue Sep 14 17:59:26 2010 @@ -36,6 +36,8 @@ PIG-1249: Safe-guards against misconfigu IMPROVEMENTS +PIG-1589: add test cases for mapreduce operator which use distributed cache (thejas) + PIG-1548: Optimize scalar to consolidate the part file (rding) PIG-1600: Docs update (chandec via olgan) Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=997020&r1=997019&r2=997020&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Tue Sep 14 17:59:26 2010 @@ -21,18 +21,17 @@ import static org.junit.Assert.assertEqu import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Random; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.tools.pigstats.PigStats; import org.junit.AfterClass; import org.junit.Before; @@ -53,7 +52,12 @@ public class TestNativeMapReduce { // http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/WordCount.java:816822 private String jarFileName = "test//org/apache/pig/test/data/TestWordCount.jar"; private String exp_msg_prefix = "Check if expected results contains: "; - final static String INPUT_FILE = "TestMapReduceInputFile"; + final static String INPUT_FILE = "TestNMapReduceInputFile"; + /** + *stop word file - used to test distributed cache usage, words in this + * file if specified will be skipped by the wordcount udf + */ + final static String STOPWORD_FILE = "TestNMapReduceStopwFile"; static MiniCluster cluster = MiniCluster.buildCluster(); private PigServer pigServer = null; @@ -72,19 +76,38 @@ public class TestNativeMapReduce { "two", "three" }; + //for stop word file + String[] stopw = { + "one" + }; + Util.createInputFile(cluster, INPUT_FILE, input); + Util.createLocalInputFile(STOPWORD_FILE, stopw); } + // createWordCountJar(){ + // // its a manual process + // javac -cp build/ivy/lib/Pig/hadoop-core-0.20.2.jar:build/ivy/lib/Pig/commons-cli-1.2.jar test/org/apache/pig/test/utils/WordCount.java + // cd test/ + // jar -cf WordCount.jar org/apache/pig/test/utils/WordCount*class + // mv WordCount.jar org/apache/pig/test/data/TestWordCount.jar + // + // + //} + @Before public void setUp() throws Exception{ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); //createWordCountJar(); } + + @AfterClass public static void oneTimeTearDown() throws Exception { Util.deleteFile(cluster, INPUT_FILE); + new File(STOPWORD_FILE).delete(); cluster.shutDown(); } @@ -94,16 +117,20 @@ public class TestNativeMapReduce { public void testNativeMRJobSimple() throws Exception{ try{ Collection<String> results = new HashSet<String>(); - results.add("(one,1)"); results.add("(two,2)"); results.add("(three,3)"); pigServer.setBatchOn(); pigServer.registerQuery("A = load '" + INPUT_FILE + "';"); + + + //also test distributed cache using the stopwords file for udf pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " + "Store A into 'table_testNativeMRJobSimple_input' "+ "Load 'table_testNativeMRJobSimple_output' "+ - "`WordCount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`;"); + "`org.apache.pig.test.utils.WordCount -files " + STOPWORD_FILE + + " table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output " + + STOPWORD_FILE + "`;"); pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';"); List<ExecJob> execJobs = pigServer.executeBatch(); @@ -131,10 +158,6 @@ public class TestNativeMapReduce { t = iter.next(); assertTrue(exp_msg_prefix + t, results.contains(t.toString())); - assertTrue("iter.hasNext()",iter.hasNext()); - t = iter.next(); - assertTrue(exp_msg_prefix + t, results.contains(t.toString())); - assertFalse(iter.hasNext()); // We have to manually delete intermediate mapreduce files @@ -152,10 +175,6 @@ public class TestNativeMapReduce { t = iter.next(); assertTrue(exp_msg_prefix + t, results.contains(t.toString())); - assertTrue("iter.hasNext()",iter.hasNext()); - t = iter.next(); - assertTrue(exp_msg_prefix + t, results.contains(t.toString())); - assertFalse(iter.hasNext()); } finally{ @@ -185,7 +204,7 @@ public class TestNativeMapReduce { pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " + "Store A into 'table_testNativeMRJobSimple_input' "+ "Load 'table_testNativeMRJobSimple_output' "+ - "`WordCount table_testNativeMRJobSimple_input " + INPUT_FILE + "`;"); + "`org.apache.pig.test.utils.WordCount table_testNativeMRJobSimple_input " + INPUT_FILE + "`;"); pigServer.registerQuery("Store B into 'table_testNativeMRJobSimpleDir';"); // List<ExecJob> execJobs = pigServer.executeBatch(); @@ -217,7 +236,7 @@ public class TestNativeMapReduce { pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " + "Store A into 'table_testNativeMRJobMultiStoreOnPred_input' "+ "Load 'table_testNativeMRJobMultiStoreOnPred_output' "+ - "`WordCount table_testNativeMRJobMultiStoreOnPred_input table_testNativeMRJobMultiStoreOnPred_output`;"); + "`org.apache.pig.test.utils.WordCount table_testNativeMRJobMultiStoreOnPred_input table_testNativeMRJobMultiStoreOnPred_output`;"); pigServer.registerQuery("Store B into 'table_testNativeMRJobMultiStoreOnPredDir';"); pigServer.executeBatch(); @@ -282,11 +301,11 @@ public class TestNativeMapReduce { pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " + "Store A into 'table_testNativeMRJobMultiQueryOpt_inputB' "+ "Load 'table_testNativeMRJobMultiQueryOpt_outputB' "+ - "`WordCount table_testNativeMRJobMultiQueryOpt_inputB table_testNativeMRJobMultiQueryOpt_outputB`;"); + "`org.apache.pig.test.utils.WordCount table_testNativeMRJobMultiQueryOpt_inputB table_testNativeMRJobMultiQueryOpt_outputB`;"); pigServer.registerQuery("C = mapreduce '" + jarFileName + "' " + "Store A into 'table_testNativeMRJobMultiQueryOpt_inputC' "+ "Load 'table_testNativeMRJobMultiQueryOpt_outputC' "+ - "`WordCount table_testNativeMRJobMultiQueryOpt_inputC table_testNativeMRJobMultiQueryOpt_outputC`;"); + "`org.apache.pig.test.utils.WordCount table_testNativeMRJobMultiQueryOpt_inputC table_testNativeMRJobMultiQueryOpt_outputC`;"); Iterator<Tuple> iter = pigServer.openIterator("C"); Tuple t; @@ -341,7 +360,7 @@ public class TestNativeMapReduce { pigServer.registerQuery("B = mapreduce '" + jarFileName + "' " + "Store A into 'table_testNativeMRJobTypeCastInserter_input' "+ "Load 'table_testNativeMRJobTypeCastInserter_output' as (name:chararray, count: int)"+ - "`WordCount table_testNativeMRJobTypeCastInserter_input table_testNativeMRJobTypeCastInserter_output`;"); + "`org.apache.pig.test.utils.WordCount table_testNativeMRJobTypeCastInserter_input table_testNativeMRJobTypeCastInserter_output`;"); pigServer.registerQuery("C = foreach B generate count+1;"); Iterator<Tuple> iter = pigServer.openIterator("C"); Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/TestWordCount.jar URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/TestWordCount.jar?rev=997020&r1=997019&r2=997020&view=diff ============================================================================== Binary files - no diff available. Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java?rev=997020&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/WordCount.java Tue Sep 14 17:59:26 2010 @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test.utils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +public class WordCount { + final static String STOP_WORDS_FILE = "wc.stopwords.file"; + public static class TokenizerMapper + extends Mapper<Object, Text, Text, IntWritable>{ + Set<String> stopWords = new HashSet<String>(); + + /* (non-Javadoc) + * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context) + * load stop words file into stopWords set + */ + @Override + protected void setup(Mapper.Context context){ + + Configuration conf = context.getConfiguration(); + String stp_file_name = conf.get(STOP_WORDS_FILE); + if(stp_file_name == null) + return; + File stp_file = new File(stp_file_name); + BufferedReader fis; + try { + fis = new BufferedReader(new FileReader(stp_file)); + } catch (FileNotFoundException e) { + e.printStackTrace(); + throw new RuntimeException("Could not open stopwords file ",e); + } + String word; + try { + while((word =fis.readLine()) != null){ + stopWords.add(word); + } + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException("error while reading stopwords",e); + } + } + + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context + ) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + if(stopWords.contains(word.toString())){ + continue; + } + context.write(word, one); + } + } + } + + public static class IntSumReducer + extends Reducer<Text,IntWritable,Text,IntWritable> { + private IntWritable result = new IntWritable(); + public void reduce(Text key, Iterable<IntWritable> values, + Context context + ) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length < 2) { + System.err.println("Usage: wordcount <in> <out> [wordcount stop word file]"); + System.exit(2); + } + Job job = new Job(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, new Path(otherArgs[0])); + FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); + if(otherArgs.length > 2){ + job.getConfiguration().set(STOP_WORDS_FILE, otherArgs[2]); + } + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +}