Author: yanz Date: Tue Mar 2 01:08:55 2010 New Revision: 917830 URL: http://svn.apache.org/viewvc?rev=917830&view=rev Log: PIG-1164 Addition of smoke tests (gauravj via yanz)
Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=917830&r1=917829&r2=917830&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Tue Mar 2 01:08:55 2010 @@ -4,6 +4,8 @@ INCOMPATIBLE CHANGES + PIG-1164 Addition of smoke tests (gauravj via yanz) + IMPROVEMENTS PIG-1206 Storing descendingly sorted PIG table as unsorted table (yanz) Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml?rev=917830&r1=917829&r2=917830&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml (original) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/build.xml Tue Mar 2 01:08:55 2010 @@ -183,6 +183,9 @@ <not> <filename name="**/TestColumnSecurity.java"/> </not> + <not> + <filename name="**/TestSmoke*.java"/> + </not> </fileset> </batchtest> @@ -292,4 +295,30 @@ </current> </clover-report> </target> + <target name="smoke-jar" depends="compile,test"> + <jar destfile="${build.test}/zebra_smoke.jar" + basedir="${build.test}" + includes="**/TestSmokeMR*.class, **/TestTableLoaderP*.class" + /> + </target> + <target name="package-tests" depends="smoke-jar"> + <tar longfile="gnu" destfile="${build.test}/zebra_smoke.tar"> + <tarfileset dir="${src.test}/smoke" + fullpath="bin/zebra_smoke_run.pl" + preserveLeadingSlashes="true"> + <include name="zebra_smoke_run.pl"/> + </tarfileset> + <tarfileset dir="${build.test}/../../../ivy/lib/Pig" + fullpath="lib/junit-4.5.jar" + preserveLeadingSlashes="true"> + <include name="junit-4.5.jar"/> + </tarfileset> + <tarfileset dir="${build.test}" + fullpath="lib/zebra_smoke.jar" + preserveLeadingSlashes="true"> + <include name="zebra_smoke.jar"/> + </tarfileset> + </tar> + </target> </project> + Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java?rev=917830&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java (added) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestSmokeMR.java Tue Mar 2 01:08:55 2010 @@ -0,0 +1,412 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.mapred; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.zebra.mapred.BasicTableOutputFormat; +import org.apache.hadoop.zebra.mapred.TestBasicTableIOFormatLocalFS.InvIndex; +import org.apache.hadoop.zebra.parser.ParseException; +import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.hadoop.zebra.types.ZebraTuple; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DefaultTuple; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.MiniCluster; +import org.junit.Assert; +import org.junit.BeforeClass; + +/** + * This is a sample a complete MR sample code for Table. It doens't contain + * 'read' part. But, it should be similar and easier to write. Refer to test + * cases in the same directory. + * + * Assume the input files contain rows of word and count, separated by a space: + * + * <pre> + * this 2 + * is 1 + * a 4 + * test 2 + * hello 1 + * world 3 + * </pre> + * + */ +public class TestSmokeMR extends Configured implements Tool{ + static String inputPath; + static String outputPath; + static String inputFileName = "smoke.txt"; + static String outputTableName ="smokeTable"; + protected static ExecType execType = ExecType.MAPREDUCE; + private static MiniCluster cluster; + protected static PigServer pigServer; + private static Configuration conf; + public static String sortKey = null; + + private static FileSystem fs; + + private static String zebraJar; + private static String whichCluster; + + static class MapClass implements + Mapper<LongWritable, Text, BytesWritable, Tuple> { + private BytesWritable bytesKey; + private Tuple tupleRow; + private Object javaObj; + + @Override + public void map(LongWritable key, Text value, + OutputCollector<BytesWritable, Tuple> output, Reporter reporter) + throws IOException { + // value should contain "word count" + String[] wdct = value.toString().split(" "); + if (wdct.length != 2) { + // LOG the error + return; + } + + byte[] word = wdct[0].getBytes(); + bytesKey.set(word, 0, word.length); + tupleRow.set(0, new String(word)); + tupleRow.set(1, Integer.parseInt(wdct[1])); + + // This key has to be created by user + Tuple userKey = new ZebraTuple(); + userKey.append(new String(word)); + userKey.append(Integer.parseInt(wdct[1])); + try { + + /* New M/R Interface */ + /* Converts user key to zebra BytesWritable key */ + /* using sort key expr tree */ + /* Returns a java base object */ + /* Done for each user key */ + + bytesKey = BasicTableOutputFormat.getSortKey(javaObj, userKey); + } catch(Exception e) { + + } + + output.collect(bytesKey, tupleRow); + } + + @Override + public void configure(JobConf job) { + bytesKey = new BytesWritable(); + try { + Schema outSchema = BasicTableOutputFormat.getSchema(job); + tupleRow = TypesUtils.createTuple(outSchema); + + /* New M/R Interface */ + /* returns an expression tree for sort keys */ + /* Returns a java base object */ + /* Done once per table */ + javaObj = BasicTableOutputFormat.getSortKeyGenerator(job); + + } catch (IOException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + // no-op + } + } + + static class ReduceClass implements + Reducer<BytesWritable, Tuple, BytesWritable, Tuple> { + Tuple outRow; + + + @Override + public void configure(JobConf job) { + } + + @Override + public void close() throws IOException { + } + public void reduce(BytesWritable key, Iterator<Tuple> values, + OutputCollector<BytesWritable, Tuple> output, Reporter reporter) + throws IOException { + try { + for(; values.hasNext();) { + output.collect(key, values.next()); + } + } catch (ExecException e) { + e.printStackTrace(); + } + } + + } + + @BeforeClass + public static void setUpOnce() throws IOException { + if (System.getenv("hadoop.log.dir") == null) { + String base = new File(".").getPath(); // getAbsolutePath(); + System + .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs"); + } + + if (System.getProperty("whichCluster") == null) { + System.setProperty("whichCluster", "realCluster"); + System.out.println("should be called"); + whichCluster = System.getProperty("whichCluster"); + } else { + whichCluster = System.getProperty("whichCluster"); + } + + System.out.println("clusterddddd: " + whichCluster); + System.out.println(" get env hadoop home: " + System.getenv("HADOOP_HOME")); + System.out.println(" get env user name: " + System.getenv("USER")); + if ((whichCluster.equalsIgnoreCase("realCluster") && System + .getenv("HADOOP_HOME") == null)) { + System.out.println("Please set HADOOP_HOME"); + System.exit(0); + } + + if ( conf == null ) { + conf = new Configuration(); + } + + if ((whichCluster.equalsIgnoreCase("realCluster") && System.getenv("USER") == null)) { + System.out.println("Please set USER"); + System.exit(0); + } +// zebraJar = System.getenv("HADOOP_HOME") + "/lib/zebra.jar"; + zebraJar = System.getenv("ZEBRA_JAR"); + File file = new File(zebraJar); + if (!file.exists() && whichCluster.equalsIgnoreCase("realCluster")) { + System.out.println("Please put zebra.jar at hadoop_home/lib"); + // System.exit(0); + } + + // set inputPath and outPath + String workingDir = null; + if (whichCluster.equalsIgnoreCase("realCluster")) { + inputPath = new String("/user/" + System.getenv("USER") + "/" + + inputFileName); + System.out.println("inputPath: " + inputPath); + outputPath = new String("/user/" + System.getenv("USER") + "/" +outputTableName); + fs = new Path(inputPath).getFileSystem(conf); + + } else { + RawLocalFileSystem rawLFS = new RawLocalFileSystem(); + fs = new LocalFileSystem(rawLFS); + workingDir = fs.getWorkingDirectory().toString().split(":")[1]; + inputPath = new String(workingDir + "/" + inputFileName); + outputPath = new String(workingDir + "/" + outputTableName); + System.out.println("inputPath: " + inputPath); + + } + writeToFile(inputPath); + // check inputPath existence + File inputFile = new File(inputPath); + if (!inputFile.exists() && whichCluster.equalsIgnoreCase("realCluster")) { + System.out.println("Please put inputFile in hdfs: " + inputPath); + // System.exit(0); + } + if (!inputFile.exists() && whichCluster.equalsIgnoreCase("miniCluster")) { + System.out + .println("Please put inputFile under workingdir. working dir is : " + + workingDir); + System.exit(0); + } + + if (whichCluster.equalsIgnoreCase("realCluster")) { + pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil + .toProperties(conf)); + pigServer.registerJar(zebraJar); + + } + + if (whichCluster.equalsIgnoreCase("miniCluster")) { + if (execType == ExecType.MAPREDUCE) { + cluster = MiniCluster.buildCluster(); + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + fs = cluster.getFileSystem(); + + } else { + pigServer = new PigServer(ExecType.LOCAL); + } + } + } + + public static void writeToFile(String inputFile) throws IOException { + if (whichCluster.equalsIgnoreCase("miniCluster")) { + FileWriter fstream = new FileWriter(inputFile); + BufferedWriter out = new BufferedWriter(fstream); + out.write("us 2\n"); + out.write("japan 2\n"); + out.write("india 4\n"); + out.write("us 2\n"); + out.write("japan 1\n"); + out.write("india 3\n"); + out.write("nouse 5\n"); + out.write("nowhere 4\n"); + out.close(); + } + if (whichCluster.equalsIgnoreCase("realCluster")) { + FSDataOutputStream fout = fs.create(new Path(inputFile)); + fout.writeBytes("us 2\n"); + fout.writeBytes("japan 2\n"); + fout.writeBytes("india 4\n"); + fout.writeBytes("us 2\n"); + fout.writeBytes("japan 1\n"); + fout.writeBytes("india 3\n"); + fout.writeBytes("nouse 5\n"); + fout.writeBytes("nowhere 4\n"); + fout.close(); + } + } + public static void checkTableExists(boolean expected, String strDir) + throws IOException { + + File theDir = null; + boolean actual = false; + if (whichCluster.equalsIgnoreCase("miniCluster")) { + theDir = new File(strDir.split(":")[1]); + actual = theDir.exists(); + + } + if (whichCluster.equalsIgnoreCase("realCluster")) { + theDir = new File(strDir.split(":")[0]); + actual = fs.exists(new Path(theDir.toString())); + } + System.out.println("the dir : " + theDir.toString()); + + if (actual != expected) { + Assert.fail("dir exists or not is different from what expected."); + } + } + public static void removeDir(Path outPath) throws IOException { + String command = null; + if (whichCluster.equalsIgnoreCase("realCluster")) { + command = System.getenv("HADOOP_HOME") + "/bin/hadoop fs -rmr " + + outPath.toString(); + } else { + StringTokenizer st = new StringTokenizer(outPath.toString(), ":"); + int count = 0; + String file = null; + while (st.hasMoreElements()) { + count++; + String token = st.nextElement().toString(); + if (count == 2) + file = token; + } + command = "rm -rf " + file; + } + Runtime runtime = Runtime.getRuntime(); + Process proc = runtime.exec(command); + int exitVal = -1; + try { + exitVal = proc.waitFor(); + } catch (InterruptedException e) { + System.err.println(e); + } + + } + public static void main(String[] args) throws ParseException, IOException, Exception { + int res = ToolRunner.run(new Configuration(), new TestSmokeMR(), args); + System.out.println("res: "+res); + checkTableExists(true, outputPath); + if (res == 0) { + System.out.println("TEST PASSED!"); + } else { + System.out.println("TEST FAILED"); + throw new IOException("Zebra MR Smoke Test Failed"); + } + + } + + @Override + public int run(String[] arg0) throws Exception { + conf = getConf(); + TestSmokeMR.setUpOnce(); + removeDir(new Path(outputPath)); + JobConf jobConf = new JobConf(conf,TestSmokeMR.class); + + jobConf.setJobName("TableMRSortedTableZebraKeyGenerator"); + jobConf.set("table.output.tfile.compression", "gz"); + jobConf.setJarByClass(TestSmokeMR.class); + // input settings + jobConf.setInputFormat(TextInputFormat.class); + jobConf.setMapperClass(TestSmokeMR.MapClass.class); + jobConf.setReducerClass(TestSmokeMR.ReduceClass.class); + jobConf.setMapOutputKeyClass(BytesWritable.class); + jobConf.setMapOutputValueClass(ZebraTuple.class); + FileInputFormat.setInputPaths(jobConf, inputPath); + jobConf.setNumMapTasks(1); + + // output settings + jobConf.setOutputFormat(BasicTableOutputFormat.class); + BasicTableOutputFormat.setOutputPath(jobConf, new Path(outputPath)); + // set the logical schema with 2 columns + BasicTableOutputFormat.setSchema(jobConf, "word:string, count:int"); + // for demo purposes, create 2 physical column groups + BasicTableOutputFormat.setStorageHint(jobConf, "[word];[count]"); + + /* New M/R Interface */ + /* Set sort columns in a comma separated string */ + /* Each sort column should belong to schema columns */ + BasicTableOutputFormat.setSortInfo(jobConf, "word, count"); + + // set map-only job. + jobConf.setNumReduceTasks(1); + JobClient.runJob(jobConf); + BasicTableOutputFormat.close(jobConf); + return 0; + } +} Added: hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl?rev=917830&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl (added) +++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/test/smoke/zebra_smoke_run.pl Tue Mar 2 01:08:55 2010 @@ -0,0 +1,76 @@ +#!/usr/local/bin/perl +use File::Basename; +use Getopt::Long; +if (@ARGV > 0 ){ + GetOptions( \%options, + 'cmd:s', + 'smoke:s', + 'help:i' + ); +} +if (defined $options{'help'} or !defined $options{'smoke'}){ + print ("perl run.pl --smoke\n"); + print ("***************Enviroment Variables Needed***************\n"); + print ("HADOOP_HOME\n"); + print ("USER\n"); + print ("ZEBRA_JAR\n"); + print ("PIG_JAR\n"); + print ("ZEBRA_SMOKE_JUNIT_JAR\n"); + print ("CLASSPATH\n"); + print ("HADOOP_CLASSPATH\n"); + print ("ZEBRA_SMOKE_PIG_CLASS\n"); + print ("ZEBRA_SMOKE_MAPRED_CLASS\n"); + exit; +} + +#make an output directory +$zebraqa= $ENV{ZEBRA_SMOKE_DIR}; +mkdir("$zebraqa/output"); +$logfile="$zebraqa/output/zebra.out"; + +if (defined $options{'smoke'}){ + $my_hadoop_home=$ENV{HADOOP_HOME}; + defined($my_hadoop_home) || die("HADOOP_HOME not defined"); + $my_user=$ENV{USER}; + defined($my_user) || die("USER not defined"); + $my_zebra_jar=$ENV{ZEBRA_JAR}; + defined($my_zebra_jar) || die("ZEBRA_JAR not defined"); + $my_pig_jar=$ENV{PIG_JAR}; + defined($my_pig_jar) || die("PIG_JAR not defined"); + $my_junit_jar=$ENV{ZEBRA_SMOKE_JUNIT_JAR}; + defined($my_junit_jar) || die("ZEBRA_SMOKE_JUNIT_JAR not defined"); + $my_classpath=$ENV{CLASSPATH}; + defined($my_classpath) || die("CLASSPATH not defined"); + $my_smoke_pig_class=$ENV{ZEBRA_SMOKE_PIG_CLASS}; + defined($my_smoke_pig_class) || die("ZEBRA_SMOKE_PIG_CLASS not defined"); + $my_mapred_class=$ENV{ZEBRA_SMOKE_MAPRED_CLASS}; + defined($my_mapred_class) || die("ZEBRA_SMOKE_MAPRED_CLASS not defined"); + $my_hadoop_classpath=$ENV{HADOOP_CLASSPATH}; + defined($my_hadoop_classpath) || die("HADOOP_CLASSPATH not defined"); + + #execute pig job + write_to_log("..... STARTING ZEBRA PIG JOB TESTING .....\n"); + $cmd="java -cp $my_classpath -DwhichCluster=\"realCluster\" -DHADOOP_HOME=$my_hadoop_home -DUSER=$my_user org.junit.runner.JUnitCore $my_smoke_pig_class > $zebraqa/output/zebra.out 2>&1"; + exec_cmd($cmd); + + #execute mapred job + write_to_log("..... STARTING ZEBRA MAP REDUCE JOB TESTING .....\n"); + $cmd="$my_hadoop_home/bin/hadoop jar $zebraqa/lib/zebra_smoke.jar $my_mapred_class -libjars $my_pig_jar,$my_zebra_jar,$my_junit_jar >> $zebraqa/output/zebra.out 2>&1"; + exec_cmd($cmd); +} + +sub exec_cmd { + my $cmd = shift or die "exec_cmd: Command not supplied!\n"; + print ($cmd."\n"); + my $rc = system($cmd); + !$rc or die "ERROR($rc): command failed: $cmd\n"; + return $rc; +} +sub write_to_log { + ($whatToWrite,$ignore) = @_; + open(MYLOG, ">>$logfile") or die "can't open $logfile : $!\n"; + print MYLOG $whatToWrite."\n"; + close MYLOG; +} + +