Author: daijy Date: Thu Aug 5 21:07:37 2010 New Revision: 982786 URL: http://svn.apache.org/viewvc?rev=982786&view=rev Log: PIG-1526: HiveColumnarLoader Partitioning Support (missing some code in last check in)
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java?rev=982786&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java Thu Aug 5 21:07:37 2010 @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.piggybank.storage.partition; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import javax.el.ELContext; +import javax.el.ELResolver; +import javax.el.ExpressionFactory; +import javax.el.FunctionMapper; +import javax.el.ValueExpression; +import javax.el.VariableMapper; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.log4j.Logger; +import org.apache.pig.LoadFunc; +import org.apache.pig.impl.util.UDFContext; + +/** + * Implements the logic for:<br/> + * <ul> + * <li>Listing partition keys and values used in an hdfs path</li> + * <li>Filtering of partitions from a pig filter operator expression</li> + * </ul> + * <p/> + * <b>Restrictions</b> <br/> + * Function calls are not supported by this partition helper and it can only + * handle String values.<br/> + * This is normally not a problem given that partition values are part of the + * hdfs folder path and is given a<br/> + * determined value that would not need parsing by any external processes.<br/> + * + * + */ +public class PathPartitionHelper { + + public static final String PARTITION_COLUMNS = PathPartitionHelper.class + + ".partition-columns"; + public static final String PARITITION_FILTER_EXPRESSION = PathPartitionHelper.class + .getName() + ".partition-filter"; + + private static final Logger LOG = Logger + .getLogger(PathPartitionHelper.class); + + transient PathPartitioner pathPartitioner = new PathPartitioner(); + + /** + * Returns the Partition keys and each key's value for a single location.<br/> + * That is the location must be something like + * mytable/partition1=a/partition2=b/myfile.<br/> + * This method will return a map with [partition1='a', partition2='b']<br/> + * The work is delegated to the PathPartitioner class + * + * @param location + * @return Map of String, String + * @throws IOException + */ + public Map<String, String> getPathPartitionKeyValues(String location) + throws IOException { + return pathPartitioner.getPathPartitionKeyValues(location); + } + + /** + * Returns the partition keys for a location.<br/> + * The work is delegated to the PathPartitioner class + * + * @param location + * String must be the base directory for the partitions + * @param conf + * @return + * @throws IOException + */ + public Set<String> getPartitionKeys(String location, Configuration conf) + throws IOException { + return pathPartitioner.getPartitionKeys(location, conf); + } + + /** + * Sets the PARITITION_FILTER_EXPRESSION property in the UDFContext + * identified by the loaderClass. + * + * @param partitionFilterExpression + * @param loaderClass + * @throws IOException + */ + public void setPartitionFilterExpression(String partitionFilterExpression, + Class<? extends LoadFunc> loaderClass, String signature) + throws IOException { + + UDFContext + .getUDFContext() + .getUDFProperties(loaderClass, new String[] { signature }) + .setProperty(PARITITION_FILTER_EXPRESSION, + partitionFilterExpression); + + } + + /** + * Reads the partition keys from the location i.e the base directory + * + * @param location + * String must be the base directory for the partitions + * @param conf + * @param loaderClass + * @throws IOException + */ + public void setPartitionKeys(String location, Configuration conf, + Class<? extends LoadFunc> loaderClass, String signature) + throws IOException { + + Set<String> partitionKeys = getPartitionKeys(location, conf); + + if (partitionKeys != null) { + StringBuilder buff = new StringBuilder(); + int i = 0; + for (String key : partitionKeys) { + if (i++ != 0) { + buff.append(","); + } + + buff.append(key); + } + + UDFContext.getUDFContext() + .getUDFProperties(loaderClass, new String[] { signature }) + .setProperty(PARTITION_COLUMNS, buff.toString()); + } + + } + + /** + * This method is called by the FileInputFormat to find the input paths for + * which splits should be calculated.<br/> + * If applyDateRanges == true: Then the HiveRCDateSplitter is used to apply + * filtering on the input files.<br/> + * Else the default FileInputFormat listStatus method is used. + * + * @param ctx + * JobContext + * @param loaderClass + * this is chosen to be a subclass of LoadFunc to maintain some + * consistency. + */ + public List<FileStatus> listStatus(JobContext ctx, + Class<? extends LoadFunc> loaderClass, String signature) + throws IOException { + + Properties properties = UDFContext.getUDFContext().getUDFProperties( + loaderClass, new String[] { signature }); + + String partitionExpression = properties + .getProperty(PARITITION_FILTER_EXPRESSION); + + ExpressionFactory expressionFactory = null; + + if (partitionExpression != null) { + expressionFactory = ExpressionFactory.newInstance(); + } + + String partitionColumnStr = properties + .getProperty(PathPartitionHelper.PARTITION_COLUMNS); + String[] partitionKeys = (partitionColumnStr == null) ? null + : partitionColumnStr.split(","); + + Path[] inputPaths = FileInputFormat.getInputPaths(ctx); + + List<FileStatus> splitPaths = null; + + if (partitionKeys != null) { + + splitPaths = new ArrayList<FileStatus>(); + + for (Path inputPath : inputPaths) { + // for each input path work recursively through each partition + // level to find the rc files + + FileSystem fs = inputPath.getFileSystem(ctx.getConfiguration()); + + if (fs.getFileStatus(inputPath).isDir()) { + // assure that we are at the root of the partition tree. + FileStatus fileStatusArr[] = fs.listStatus(inputPath); + + if (fileStatusArr != null) { + for (FileStatus childFileStatus : fileStatusArr) { + getPartitionedFiles(expressionFactory, + partitionExpression, fs, childFileStatus, + 0, partitionKeys, splitPaths); + } + } + + } else { + splitPaths.add(fs.getFileStatus(inputPath)); + } + + } + + if (splitPaths.size() < 1) { + LOG.error("Not split paths where found, please check that the filter logic for the partition keys does not filter out everything "); + } + + } + + return splitPaths; + } + + /** + * Recursively works through all directories, skipping filtered partitions. + * + * @param fs + * @param fileStatus + * @param partitionLevel + * @param partitionKeys + * @param splitPaths + * @throws IOException + */ + private void getPartitionedFiles(ExpressionFactory expressionFactory, + String partitionExpression, FileSystem fs, FileStatus fileStatus, + int partitionLevel, String[] partitionKeys, + List<FileStatus> splitPaths) throws IOException { + + String partition = (partitionLevel < partitionKeys.length) ? partitionKeys[partitionLevel] + : null; + + Path path = fileStatus.getPath(); + + // filter out hidden files + if (path.getName().startsWith("_")) { + return; + } + + // pre filter logic + // return if any of the logic is not true + if (partition != null) { + if (fileStatus.isDir()) { + + // check that the dir name is equal to that of the partition + // name + if (!path.getName().startsWith(partition)) + return; + + } else { + // else its a file but not at the end of the partition tree so + // its ignored. + return; + } + + // this means we are inside the partition so that the path will + // contain all partitions plus its values + // we can apply the partition filter expression here that was passed + // to the HiveColumnarLoader.setPartitionExpression + if (partitionLevel == (partitionKeys.length - 1) + && !evaluatePartitionExpression(expressionFactory, + partitionExpression, path)) { + + LOG.debug("Pruning partition: " + path); + return; + + } + + } + + // after this point we now that the partition is either null + // which means we are at the end of the partition tree and all files + // sub directories should be included. + // or that we are still navigating the partition tree. + int nextPartitionLevel = partitionLevel + 1; + + // iterate over directories if fileStatus is a dir. + FileStatus[] childStatusArr = null; + + if (fileStatus.isDir()) { + if ((childStatusArr = fs.listStatus(path)) != null) { + for (FileStatus childFileStatus : childStatusArr) { + getPartitionedFiles(expressionFactory, partitionExpression, + fs, childFileStatus, nextPartitionLevel, + partitionKeys, splitPaths); + } + } + } else { + // add file to splitPaths + splitPaths.add(fileStatus); + } + + } + + /** + * Evaluates the partitionExpression set in the + * HiveColumnarLoader.setPartitionExpression. * @ + * + * @param partitionExpression + * String + * @param path + * Path + * @return boolean + * @throws IOException + */ + private boolean evaluatePartitionExpression( + ExpressionFactory expressionFactory, String partitionExpression, + Path path) throws IOException { + + boolean ret = true; + + if (expressionFactory != null) { + if (!partitionExpression.startsWith("${")) { + partitionExpression = "${" + partitionExpression + "}"; + } + + Map<String, String> context = pathPartitioner + .getPathPartitionKeyValues(path.toString()); + + MapVariableMapper mapper = new MapVariableMapper(expressionFactory, + context); + VariableContext varContext = new VariableContext(mapper); + + ValueExpression evalExpression = expressionFactory + .createValueExpression(varContext, partitionExpression, + Boolean.class); + + ret = (Boolean) evalExpression.getValue(varContext); + + LOG.debug("Evaluated: " + partitionExpression + " returned: " + ret); + + } + + return ret; + } + + /** + * + * ELContext implementation containing the VariableMapper MapVariableMapper + * + */ + class VariableContext extends ELContext { + + VariableMapper variableMapper; + + VariableContext(VariableMapper variableMapper) { + this.variableMapper = variableMapper; + } + + @Override + public ELResolver getELResolver() { + // TODO Auto-generated method stub + return null; + } + + @Override + public FunctionMapper getFunctionMapper() { + return null; + } + + @Override + public VariableMapper getVariableMapper() { + return variableMapper; + } + + } + + /** + * Implementation for the VariableMapper that takes the values in a Map and + * creates ValueExpression objects for each. + * + */ + class MapVariableMapper extends VariableMapper { + private Map<String, ValueExpression> valueExpressionMap; + + public MapVariableMapper(ExpressionFactory expressionFactory, + Map<String, String> variableMap) { + + valueExpressionMap = new HashMap<String, ValueExpression>(); + + for (Entry<String, String> entry : variableMap.entrySet()) { + ValueExpression valExpr = expressionFactory + .createValueExpression(entry.getValue(), String.class); + valueExpressionMap.put(entry.getKey(), valExpr); + } + + } + + @Override + public ValueExpression resolveVariable(String variableName) { + return valueExpressionMap.get(variableName); + } + + @Override + public ValueExpression setVariable(String variableName, + ValueExpression valueExpression) { + return valueExpressionMap.put(variableName, valueExpression); + } + + } + +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java?rev=982786&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java Thu Aug 5 21:07:37 2010 @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.piggybank.storage.partition; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * + * Its convenient sometimes to partition logs by date values or other e.g. + * country, city etc.<br/> + * A daydate partitioned hdfs directory might look something like:<br/> + * + * <pre> + * /logs/repo/mylog/ + * daydate=2010-01-01 + * daydate=2010-01-02 + * </pre> + * + * This class accepts a path like /logs/repo/mylog and return a map of the + * partition keys + */ +public class PathPartitioner { + + /** + * Note: this must be the path lowes in the Searches for the key=value pairs + * in the path pointer by the location parameter. + * + * @param location + * String root path in hdsf e.g. /user/hive/warehouse or + * /logs/repo + * @param conf + * Configuration + * @return Set of String. The order is maintained as per the directory tree. + * i.e. if /logs/repo/year=2010/month=2010 exists the first item in + * the set will be year and the second month. + * @throws IOException + */ + public Map<String, String> getPathPartitionKeyValues(String location) + throws IOException { + + // use LinkedHashSet because order is important here. + Map<String, String> partitionKeys = new LinkedHashMap<String, String>(); + + String[] pathSplit = location.split("/"); + + for (String pathSplitItem : pathSplit) { + parseAndPutKeyValue(pathSplitItem, partitionKeys); + } + + return partitionKeys; + } + + /** + * Searches for the key=value pairs in the path pointer by the location + * parameter. + * + * @param location + * String root path in hdsf e.g. /user/hive/warehouse or + * /logs/repo + * @param conf + * Configuration + * @return Set of String. The order is maintained as per the directory tree. + * i.e. if /logs/repo/year=2010/month=2010 exists the first item in + * the set will be year and the second month. + * @throws IOException + */ + public Set<String> getPartitionKeys(String location, Configuration conf) + throws IOException { + + // find the hive type partition key=value pairs from the path. + // first parse the string alone. + Path path = new Path(location); + FileSystem fs = path.getFileSystem(conf); + + FileStatus[] fileStatusArr = null; + + // use LinkedHashSet because order is important here. + Set<String> partitionKeys = new LinkedHashSet<String>(); + + parseAndPutKeyValue(location, partitionKeys); + + while (!((fileStatusArr = fs.listStatus(path)) == null || fs + .isFile(path))) { + for (FileStatus fileStatus : fileStatusArr) { + + path = fileStatus.getPath(); + + // ignore hidden directories + if (fileStatus.getPath().getName().startsWith("_") + || !fileStatus.isDir()) + continue; + + parseAndPutKeyValue(path.getName(), partitionKeys); + // at the first directory found stop the for loop after parsing + // for key value pairs + break; + } + + } + + return partitionKeys; + } + + private final void parseAndPutKeyValue(String pathName, + Map<String, String> partitionKeys) { + String[] keyValue = parsePathKeyValue(pathName); + if (keyValue != null) { + partitionKeys.put(keyValue[0], keyValue[1]); + } + + } + + private final void parseAndPutKeyValue(String pathName, + Set<String> partitionKeys) { + String[] keyValue = parsePathKeyValue(pathName); + if (keyValue != null) { + partitionKeys.add(keyValue[0]); + } + + } + + /** + * Will look for key=value pairs in the path for example: + * /user/hive/warehouse/mylogs/year=2010/month=07 + * + * @param path + * @return String[] [0]= key [1] = value + */ + public String[] parsePathKeyValue(String path) { + int slashIndex = path.lastIndexOf('/'); + String parsedPath = path; + String[] keyValue = null; + + if (slashIndex > 0) { + parsedPath = path.substring(slashIndex); + } + + if (parsedPath.contains("=")) { + String split[] = parsedPath.split("="); + if (split.length == 2) { + keyValue = split; + } + } + + return keyValue; + } + +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java?rev=982786&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java Thu Aug 5 21:07:37 2010 @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.piggybank.test.storage; + +import java.io.File; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.piggybank.storage.partition.PathPartitionHelper; +import org.apache.pig.test.Util; +import org.junit.Test; + +/** + * + * Tests the PathPartitionHelper can:<br/> + * <ul> + * <li>Filter path partitioned files based on an expression being true</li> + * <li>Filter path partitioned files based on an expression being false</li> + * <li>Filter path partitioned files with no expression</li> + * </ul> + * + */ +public class TestPathPartitionHelper extends TestCase { + + private static Configuration conf = null; + + File baseDir; + File partition1; + File partition2; + File partition3; + + @Test + public void testListStatusPartitionFilterNotFound() throws Exception { + + PathPartitionHelper partitionHelper = new PathPartitionHelper(); + + Job job = new Job(conf); + job.setJobName("TestJob"); + job.setInputFormatClass(FileInputFormat.class); + + Configuration conf = job.getConfiguration(); + FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath())); + + JobContext jobContext = new JobContext(conf, job.getJobID()); + + partitionHelper.setPartitionFilterExpression("year < '2010'", + PigStorage.class, "1"); + partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf, + PigStorage.class, "1"); + + List<FileStatus> files = partitionHelper.listStatus(jobContext, + PigStorage.class, "1"); + + assertEquals(0, files.size()); + + } + + @Test + public void testListStatusPartitionFilterFound() throws Exception { + + PathPartitionHelper partitionHelper = new PathPartitionHelper(); + + Job job = new Job(conf); + job.setJobName("TestJob"); + job.setInputFormatClass(FileInputFormat.class); + + Configuration conf = job.getConfiguration(); + FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath())); + + JobContext jobContext = new JobContext(conf, job.getJobID()); + + partitionHelper.setPartitionFilterExpression( + "year<='2010' and month=='01' and day>='01'", PigStorage.class, "2"); + partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf, + PigStorage.class, "2"); + + List<FileStatus> files = partitionHelper.listStatus(jobContext, + PigStorage.class, "2"); + + assertNotNull(files); + assertEquals(1, files.size()); + + } + + @Test + public void testListStatus() throws Exception { + + PathPartitionHelper partitionHelper = new PathPartitionHelper(); + + Job job = new Job(conf); + job.setJobName("TestJob"); + job.setInputFormatClass(FileInputFormat.class); + + Configuration conf = job.getConfiguration(); + FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath())); + + JobContext jobContext = new JobContext(conf, job.getJobID()); + + partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf, + PigStorage.class, "3"); + + List<FileStatus> files = partitionHelper.listStatus(jobContext, + PigStorage.class, "3"); + + assertNotNull(files); + assertEquals(1, files.size()); + + } + + @Override + protected void tearDown() throws Exception { + + Util.deleteDirectory(baseDir); + + } + + @Override + protected void setUp() throws Exception { + conf = new Configuration(false); + + baseDir = createDir(null, + "testPathPartitioner-testGetKeys-" + System.currentTimeMillis()); + + partition1 = createDir(baseDir, "year=2010"); + partition2 = createDir(partition1, "month=01"); + partition3 = createDir(partition2, "day=01"); + + File file = new File(partition3, "testfile-" + + System.currentTimeMillis()); + file.createNewFile(); + + } + + private File createDir(File parent, String name) { + File file = new File(parent, name); + file.mkdirs(); + return file; + } + +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java?rev=982786&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Thu Aug 5 21:07:37 2010 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.piggybank.test.storage; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.piggybank.storage.partition.PathPartitioner; +import org.apache.pig.test.Util; +import org.junit.Test; + +/** + * + * Tests that the PathPartitioner can:<br/> + * <ul> + * <li>Read keys from a partitioned file path</li> + * <li>Read keys and values from a partitioned file path</li> + * </ul> + * + */ +public class TestPathPartitioner extends TestCase { + + private static Configuration conf = null; + + File baseDir; + File partition1; + File partition2; + File partition3; + + @Override + protected void tearDown() throws Exception { + + Util.deleteDirectory(baseDir); + + } + + @Override + protected void setUp() throws Exception { + conf = new Configuration(); + + baseDir = createDir(null, + "testPathPartitioner-testGetKeys-" + System.currentTimeMillis()); + + partition1 = createDir(baseDir, "year=2010"); + partition2 = createDir(partition1, "month=01"); + partition3 = createDir(partition2, "day=01"); + + File file = new File(partition3, "testfile-" + + System.currentTimeMillis()); + file.createNewFile(); + + } + + @Test + public void testGetKeyValues() throws Exception { + PathPartitioner partitioner = new PathPartitioner(); + + Map<String, String> map = partitioner + .getPathPartitionKeyValues(partition3.getAbsolutePath()); + + String[] keys = map.keySet().toArray(new String[] {}); + + assertEquals("2010", map.get(keys[0])); + assertEquals("01", map.get(keys[1])); + assertEquals("01", map.get(keys[2])); + + } + + @Test + public void testGetKeys() throws Exception { + + PathPartitioner pathPartitioner = new PathPartitioner(); + Set<String> keys = pathPartitioner.getPartitionKeys( + baseDir.getAbsolutePath(), conf); + + assertNotNull(keys); + assertEquals(3, keys.size()); + + String[] keyArr = keys.toArray(new String[] {}); + + assertEquals("year", keyArr[0]); + assertEquals("month", keyArr[1]); + assertEquals("day", keyArr[2]); + + } + + private File createDir(File parent, String name) { + File file = new File(parent, name); + file.mkdirs(); + return file; + } + +}