Author: daijy Date: Mon Aug 9 05:27:39 2010 New Revision: 983524 URL: http://svn.apache.org/viewvc?rev=983524&view=rev Log: PIG-1526: HiveColumnarLoader Partitioning Support (fix piggybank unit test failure)
Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Modified: hadoop/pig/trunk/contrib/piggybank/java/build.xml Modified: hadoop/pig/trunk/contrib/piggybank/java/build.xml URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/build.xml?rev=983524&r1=983523&r2=983524&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/build.xml (original) +++ hadoop/pig/trunk/contrib/piggybank/java/build.xml Mon Aug 9 05:27:39 2010 @@ -116,7 +116,7 @@ <echo> *** Running UDF tests ***</echo> <delete dir="${test.logs}"/> <mkdir dir="${test.logs}"/> - <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="256m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed"> + <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed"> <sysproperty key="hadoop.log.dir" value="${test.logs}"/> <classpath refid="test.classpath"/> <formatter type="${test.junit.output.format}" /> Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java?rev=983524&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java Mon Aug 9 05:27:39 2010 @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.piggybank.test.storage; + +import java.io.File; +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.RCFile; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.pig.ExecType; +import org.apache.pig.FuncSpec; +import org.apache.pig.PigServer; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.Util; +import org.junit.Assert; +import org.junit.Test; + +/** + * + * Tests that the HiveColumnLoader can: + * <ul> + * <li>Load files without partitioning</li> + * <li>Load files with partitioning and dates defined in constructor, or as a + * filter</li> + * <li>Load files using pig's push down loader capabilities.</li> + * </ul> + * + */ +public class TestHiveColumnarLoader extends TestCase { + + static Configuration conf = null; + + // for single non partitioned file testing + static File simpleDataFile = null; + // for multiple non partitioned file testing + static File simpleDataDir = null; + + static File datePartitionedDir = null; + static File yearMonthDayHourPartitionedDir = null; + + // used for cleanup + static List<String> datePartitionedRCFiles; + static List<String> datePartitionedDirs; + + static private FileSystem fs; + + static int columnMaxSize = 30; + + static int columnCount = 3; + + static int simpleDirFileCount = 3; + static int simpleRowCount = 10; + + static String endingDate = null; + static String startingDate = null; + static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + static Calendar calendar = null; + static int datePartitionedRowCount; + + private static Calendar yearMonthDayHourcalendar; + + @Override + public synchronized void setUp() throws Exception { + + conf = new Configuration(); + + fs = LocalFileSystem.getLocal(conf); + + produceSimpleData(); + + produceDatePartitionedData(); + + produceYearMonthDayHourPartitionedData(); + + } + + @Override + public void tearDown() { + + Util.deleteDirectory(datePartitionedDir); + + Util.deleteDirectory(yearMonthDayHourPartitionedDir); + + Util.deleteDirectory(simpleDataDir); + + simpleDataFile.delete(); + + } + + @Test + public void testReadingSingleFileNoProjections() throws IOException { + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')"; + + String singlePartitionedFile = simpleDataFile.getAbsolutePath(); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + + funcSpecString + ";"); + + Iterator<Tuple> result = server.openIterator("a"); + + int count = 0; + Tuple t = null; + while ((t = result.next()) != null) { + assertEquals(3, t.size()); + assertEquals(DataType.CHARARRAY, t.getType(0)); + count++; + } + + Assert.assertEquals(simpleRowCount, count); + } + + @Test + public void testReadingMultipleNonPartitionedFiles() throws IOException { + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')"; + + String singlePartitionedDir = simpleDataDir.getAbsolutePath(); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + singlePartitionedDir + "' using " + + funcSpecString + ";"); + + server.registerQuery("b = foreach a generate f1;"); + + Iterator<Tuple> result = server.openIterator("b"); + + int count = 0; + Tuple t = null; + while ((t = result.next()) != null) { + assertEquals(1, t.size()); + assertEquals(DataType.CHARARRAY, t.getType(0)); + count++; + } + + Assert.assertEquals(simpleDirFileCount * simpleRowCount, count); + } + + @Test + public void testReadingSingleFile() throws IOException { + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')"; + + String singlePartitionedFile = simpleDataFile.getAbsolutePath(); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using " + + funcSpecString + ";"); + + server.registerQuery("b = foreach a generate f1;"); + + Iterator<Tuple> result = server.openIterator("b"); + + int count = 0; + Tuple t = null; + while ((t = result.next()) != null) { + assertEquals(1, t.size()); + assertEquals(DataType.CHARARRAY, t.getType(0)); + count++; + } + + Assert.assertEquals(simpleRowCount, count); + } + + @Test + public void testYearMonthDayHourPartitionedFilesWithProjection() + throws IOException { + int count = 0; + + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')"; + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + + yearMonthDayHourPartitionedDir.getAbsolutePath() + "' using " + + funcSpecString + ";"); + server.registerQuery("f = FILTER a by year=='2010';"); + server.registerQuery("b = foreach f generate f1;"); + + Iterator<Tuple> result = server.openIterator("b"); + + Tuple t = null; + while ((t = result.next()) != null) { + assertEquals(1, t.size()); + assertEquals(DataType.CHARARRAY, t.getType(0)); + count++; + } + + Assert.assertEquals(240, count); + + } + + @Test + public void testYearMonthDayHourPartitionedFilesWithProjectionAndPartitionColumns() + throws IOException { + int count = 0; + + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')"; + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + + yearMonthDayHourPartitionedDir.getAbsolutePath() + "' using " + + funcSpecString + ";"); + server.registerQuery("f = FILTER a by year=='2010';"); + server.registerQuery("r = foreach f generate year, f2, f3, month, day, hour;"); + server.registerQuery("b = ORDER r BY year, month, day, hour;"); + Iterator<Tuple> result = server.openIterator("b"); + + Tuple t = null; + while ((t = result.next()) != null) { + System.out.println("Tuple: " + t); + assertEquals(6, t.size()); + count++; + } + System.out.println("Count: " + count); + Assert.assertEquals(240, count); + } + + @Test + public void test1DayDatePartitionedFilesWithProjection() throws IOException { + int count = 0; + + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'" + + ", '" + startingDate + ":" + startingDate + "')"; + + System.out.println(funcSpecString); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + + datePartitionedDir.getAbsolutePath() + "' using " + + funcSpecString + ";"); + server.registerQuery("b = FOREACH a GENERATE f2 as p;"); + Iterator<Tuple> result = server.openIterator("b"); + + Tuple t = null; + while ((t = result.next()) != null) { + assertEquals(1, t.size()); + assertEquals(DataType.CHARARRAY, t.getType(0)); + count++; + } + + Assert.assertEquals(50, count); + } + + @Test + public void test1DayDatePartitionedFiles() throws IOException { + int count = 0; + + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'" + + ", '" + startingDate + ":" + startingDate + "')"; + + System.out.println(funcSpecString); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + + datePartitionedDir.getAbsolutePath() + "' using " + + funcSpecString + ";"); + Iterator<Tuple> result = server.openIterator("a"); + + while ((result.next()) != null) { + count++; + } + + Assert.assertEquals(50, count); + } + + @Test + public void testDatePartitionedFiles() throws IOException { + int count = 0; + + String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'" + + ", '" + startingDate + ":" + endingDate + "')"; + + System.out.println(funcSpecString); + + PigServer server = new PigServer(ExecType.LOCAL); + server.setBatchOn(); + server.registerFunction( + "org.apache.pig.piggybank.storage.HiveColumnarLoader", + new FuncSpec(funcSpecString)); + + server.registerQuery("a = LOAD '" + + datePartitionedDir.getAbsolutePath() + "' using " + + funcSpecString + ";"); + Iterator<Tuple> result = server.openIterator("a"); + + while ((result.next()) != null) { + count++; + } + + Assert.assertEquals(datePartitionedRowCount, count); + } + + private static void produceDatePartitionedData() throws IOException { + datePartitionedRowCount = 0; + datePartitionedDir = new File("testhiveColumnarLoader-dateDir-" + + System.currentTimeMillis()); + datePartitionedDir.mkdir(); + datePartitionedDir.deleteOnExit(); + + int dates = 4; + calendar = Calendar.getInstance(); + + calendar.set(Calendar.DAY_OF_MONTH, Calendar.MONDAY); + calendar.set(Calendar.MONTH, Calendar.JANUARY); + + startingDate = dateFormat.format(calendar.getTime()); + + datePartitionedRCFiles = new ArrayList<String>(); + datePartitionedDirs = new ArrayList<String>(); + + for (int i = 0; i < dates; i++) { + + File file = new File(datePartitionedDir, "daydate=" + + dateFormat.format(calendar.getTime())); + calendar.add(Calendar.DAY_OF_MONTH, 1); + + file.mkdir(); + file.deleteOnExit(); + + // for each daydate write 5 partitions + for (int pi = 0; pi < 5; pi++) { + Path path = new Path(new Path(file.getAbsolutePath()), + "parition" + pi); + + datePartitionedRowCount += writeRCFileTest(fs, simpleRowCount, + path, columnCount, new DefaultCodec(), columnCount); + + new File(path.toString()).deleteOnExit(); + datePartitionedRCFiles.add(path.toString()); + datePartitionedDirs.add(file.toString()); + + } + + } + + endingDate = dateFormat.format(calendar.getTime()); + } + + private static void produceYearMonthDayHourPartitionedData() + throws IOException { + + yearMonthDayHourPartitionedDir = new File( + "testhiveColumnarLoader-yearMonthDayHourDir-" + + System.currentTimeMillis()); + yearMonthDayHourPartitionedDir.mkdir(); + yearMonthDayHourPartitionedDir.deleteOnExit(); + + int years = 1; + int months = 2; + int days = 3; + int hours = 4; + + yearMonthDayHourcalendar = Calendar.getInstance(); + + yearMonthDayHourcalendar.set(Calendar.DAY_OF_MONTH, Calendar.MONDAY); + yearMonthDayHourcalendar.set(Calendar.MONTH, Calendar.JANUARY); + + for (int i = 0; i < years; i++) { + + File file = new File(yearMonthDayHourPartitionedDir, "year=" + + yearMonthDayHourcalendar.get(Calendar.YEAR)); + + file.mkdir(); + file.deleteOnExit(); + + for (int monthIndex = 0; monthIndex < months; monthIndex++) { + + File monthFile = new File(file, "month=" + + yearMonthDayHourcalendar.get(Calendar.MONTH)); + monthFile.mkdir(); + monthFile.deleteOnExit(); + + for (int dayIndex = 0; dayIndex < days; dayIndex++) { + File dayFile = new File(monthFile, + "day=" + + yearMonthDayHourcalendar + .get(Calendar.DAY_OF_MONTH)); + dayFile.mkdir(); + dayFile.deleteOnExit(); + + for (int hourIndex = 0; hourIndex < hours; hourIndex++) { + File hourFile = new File(dayFile, + "hour=" + + yearMonthDayHourcalendar + .get(Calendar.HOUR_OF_DAY)); + hourFile.mkdir(); + hourFile.deleteOnExit(); + + File rcFile = new File(hourFile.getAbsolutePath() + + "/attempt-00000"); + Path hourFilePath = new Path(rcFile.getAbsolutePath()); + rcFile.deleteOnExit(); + + writeRCFileTest(fs, simpleRowCount, hourFilePath, + columnCount, new DefaultCodec(), columnCount); + + yearMonthDayHourcalendar.add(Calendar.HOUR_OF_DAY, 1); + } + + yearMonthDayHourcalendar.add(Calendar.DAY_OF_MONTH, 1); + } + yearMonthDayHourcalendar.add(Calendar.MONTH, 1); + } + + } + + endingDate = dateFormat.format(calendar.getTime()); + } + + /** + * Writes out a simple temporary file with 5 columns and 100 rows.<br/> + * Data is random numbers. + * + * @throws SerDeException + * @throws IOException + */ + private static final void produceSimpleData() throws SerDeException, + IOException { + // produce on single file + simpleDataFile = File.createTempFile("testhiveColumnarLoader", ".txt"); + simpleDataFile.deleteOnExit(); + + Path path = new Path(simpleDataFile.getPath()); + + writeRCFileTest(fs, simpleRowCount, path, columnCount, + new DefaultCodec(), columnCount); + + // produce a folder of simple data + simpleDataDir = new File("simpleDataDir" + System.currentTimeMillis()); + simpleDataDir.mkdir(); + + for (int i = 0; i < simpleDirFileCount; i++) { + + simpleDataFile = new File(simpleDataDir, "testhiveColumnarLoader-" + + i + ".txt"); + + Path filePath = new Path(simpleDataFile.getPath()); + + writeRCFileTest(fs, simpleRowCount, filePath, columnCount, + new DefaultCodec(), columnCount); + + } + + } + + static Random randomCharGenerator = new Random(3); + + static Random randColLenGenerator = new Random(20); + + private static void resetRandomGenerators() { + randomCharGenerator = new Random(3); + randColLenGenerator = new Random(20); + } + + private static int writeRCFileTest(FileSystem fs, int rowCount, Path file, + int columnNum, CompressionCodec codec, int columnCount) + throws IOException { + fs.delete(file, true); + int rowsWritten = 0; + + resetRandomGenerators(); + + RCFileOutputFormat.setColumnNumber(conf, columnNum); + RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, codec); + + byte[][] columnRandom; + + BytesRefArrayWritable bytes = new BytesRefArrayWritable(columnNum); + columnRandom = new byte[columnNum][]; + for (int i = 0; i < columnNum; i++) { + BytesRefWritable cu = new BytesRefWritable(); + bytes.set(i, cu); + } + + for (int i = 0; i < rowCount; i++) { + nextRandomRow(columnRandom, bytes, columnCount); + rowsWritten++; + writer.append(bytes); + } + writer.close(); + + return rowsWritten; + } + + private static void nextRandomRow(byte[][] row, + BytesRefArrayWritable bytes, int columnCount) { + bytes.resetValid(row.length); + for (int i = 0; i < row.length; i++) { + + row[i] = new byte[columnCount]; + for (int j = 0; j < columnCount; j++) + row[i][j] = getRandomChar(randomCharGenerator); + bytes.get(i).set(row[i], 0, columnCount); + } + } + + private static int CHAR_END = 122 - 7; + + private static byte getRandomChar(Random random) { + byte b = 0; + do { + b = (byte) random.nextInt(CHAR_END); + } while ((b < 65)); + if (b > 90) { + b = 7; + } + return b; + } +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java?rev=983524&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java Mon Aug 9 05:27:39 2010 @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.piggybank.test.storage; + +import java.io.File; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.piggybank.storage.partition.PathPartitionHelper; +import org.apache.pig.test.Util; +import org.junit.Test; + +/** + * + * Tests the PathPartitionHelper can:<br/> + * <ul> + * <li>Filter path partitioned files based on an expression being true</li> + * <li>Filter path partitioned files based on an expression being false</li> + * <li>Filter path partitioned files with no expression</li> + * </ul> + * + */ +public class TestPathPartitionHelper extends TestCase { + + private static Configuration conf = null; + + File baseDir; + File partition1; + File partition2; + File partition3; + + @Test + public void testListStatusPartitionFilterNotFound() throws Exception { + + PathPartitionHelper partitionHelper = new PathPartitionHelper(); + + Job job = new Job(conf); + job.setJobName("TestJob"); + job.setInputFormatClass(FileInputFormat.class); + + Configuration conf = job.getConfiguration(); + FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath())); + + JobContext jobContext = new JobContext(conf, job.getJobID()); + + partitionHelper.setPartitionFilterExpression("year < '2010'", + PigStorage.class, "1"); + partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf, + PigStorage.class, "1"); + + List<FileStatus> files = partitionHelper.listStatus(jobContext, + PigStorage.class, "1"); + + assertEquals(0, files.size()); + + } + + @Test + public void testListStatusPartitionFilterFound() throws Exception { + + PathPartitionHelper partitionHelper = new PathPartitionHelper(); + + Job job = new Job(conf); + job.setJobName("TestJob"); + job.setInputFormatClass(FileInputFormat.class); + + Configuration conf = job.getConfiguration(); + FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath())); + + JobContext jobContext = new JobContext(conf, job.getJobID()); + + partitionHelper.setPartitionFilterExpression( + "year<='2010' and month=='01' and day>='01'", PigStorage.class, "2"); + partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf, + PigStorage.class, "2"); + + List<FileStatus> files = partitionHelper.listStatus(jobContext, + PigStorage.class, "2"); + + assertNotNull(files); + assertEquals(1, files.size()); + + } + + @Test + public void testListStatus() throws Exception { + + PathPartitionHelper partitionHelper = new PathPartitionHelper(); + + Job job = new Job(conf); + job.setJobName("TestJob"); + job.setInputFormatClass(FileInputFormat.class); + + Configuration conf = job.getConfiguration(); + FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath())); + + JobContext jobContext = new JobContext(conf, job.getJobID()); + + partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf, + PigStorage.class, "3"); + + List<FileStatus> files = partitionHelper.listStatus(jobContext, + PigStorage.class, "3"); + + assertNotNull(files); + assertEquals(1, files.size()); + + } + + @Override + protected void tearDown() throws Exception { + + Util.deleteDirectory(baseDir); + + } + + @Override + protected void setUp() throws Exception { + File oldConf = new File(System.getProperty("user.home")+"/pigtest/conf/hadoop-site.xml"); + oldConf.delete(); + conf = new Configuration(false); + + baseDir = createDir(null, + "testPathPartitioner-testGetKeys-" + System.currentTimeMillis()); + + partition1 = createDir(baseDir, "year=2010"); + partition2 = createDir(partition1, "month=01"); + partition3 = createDir(partition2, "day=01"); + + File file = new File(partition3, "testfile-" + + System.currentTimeMillis()); + file.createNewFile(); + + } + + private File createDir(File parent, String name) { + File file = new File(parent, name); + file.mkdirs(); + return file; + } + +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java?rev=983524&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Mon Aug 9 05:27:39 2010 @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.piggybank.test.storage; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.piggybank.storage.partition.PathPartitioner; +import org.apache.pig.test.Util; +import org.junit.Test; + +/** + * + * Tests that the PathPartitioner can:<br/> + * <ul> + * <li>Read keys from a partitioned file path</li> + * <li>Read keys and values from a partitioned file path</li> + * </ul> + * + */ +public class TestPathPartitioner extends TestCase { + + private static Configuration conf = null; + + File baseDir; + File partition1; + File partition2; + File partition3; + + @Override + protected void tearDown() throws Exception { + + Util.deleteDirectory(baseDir); + + } + + @Override + protected void setUp() throws Exception { + File oldConf = new File(System.getProperty("user.home")+"/pigtest/conf/hadoop-site.xml"); + oldConf.delete(); + conf = new Configuration(); + + baseDir = createDir(null, + "testPathPartitioner-testGetKeys-" + System.currentTimeMillis()); + + partition1 = createDir(baseDir, "year=2010"); + partition2 = createDir(partition1, "month=01"); + partition3 = createDir(partition2, "day=01"); + + File file = new File(partition3, "testfile-" + + System.currentTimeMillis()); + file.createNewFile(); + + } + + @Test + public void testGetKeyValues() throws Exception { + PathPartitioner partitioner = new PathPartitioner(); + + Map<String, String> map = partitioner + .getPathPartitionKeyValues(partition3.getAbsolutePath()); + + String[] keys = map.keySet().toArray(new String[] {}); + + assertEquals("2010", map.get(keys[0])); + assertEquals("01", map.get(keys[1])); + assertEquals("01", map.get(keys[2])); + + } + + @Test + public void testGetKeys() throws Exception { + + PathPartitioner pathPartitioner = new PathPartitioner(); + Set<String> keys = pathPartitioner.getPartitionKeys( + baseDir.getAbsolutePath(), conf); + + assertNotNull(keys); + assertEquals(3, keys.size()); + + String[] keyArr = keys.toArray(new String[] {}); + + assertEquals("year", keyArr[0]); + assertEquals("month", keyArr[1]); + assertEquals("day", keyArr[2]); + + } + + private File createDir(File parent, String name) { + File file = new File(parent, name); + file.mkdirs(); + return file; + } + +}