Author: pradeepkth Date: Fri Dec 4 20:12:48 2009 New Revision: 887338 URL: http://svn.apache.org/viewvc?rev=887338&view=rev Log: PIG-1090: Update sources to reflect recent changes in load-store interfaces - incremental commit to address getSchema() functionality in BinStorage (pradeepkth)
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=887338&r1=887337&r2=887338&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java Fri Dec 4 20:12:48 2009 @@ -18,32 +18,151 @@ package org.apache.pig; -import java.util.Map; +import java.util.Arrays; +import java.util.List; import org.apache.pig.data.DataType; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; -/** - * - */ public class ResourceSchema { - int version; + /* Array Getters intentionally return mutable arrays instead of copies, + * to simplify updates without unnecessary copying. + * Setters make a copy of the arrays in order to prevent an array + * from being shared by two objects, with modifications in one + * accidentally changing the other. + */ + + // initializing arrays to empty so we don't have to worry about NPEs + // setters won't set to null + public ResourceFieldSchema[] fields = new ResourceFieldSchema[0]; + + public enum Order { ASCENDING, DESCENDING } + public int[] sortKeys = {}; // each entry is an offset into the fields array. + public Order[] sortKeyOrders = new Order[0]; + + + public int version = 0; - public class ResourceFieldSchema { + public static class ResourceFieldSchema { public String name; - /** - * byte representing type as enumerated in {...@link DataType} - */ - public byte type; + // values are constants from DataType + public byte type; public String description; - public ResourceFieldSchema schema; // nested tuples and bags will have their own schema + + // nested tuples and bags will have their own schema + public ResourceSchema schema; + + public ResourceFieldSchema() { + + } + + public ResourceFieldSchema(FieldSchema fieldSchema) { + type = fieldSchema.type; + name = fieldSchema.alias; + description = "autogenerated from Pig Field Schema"; + if (type == DataType.BAG || type == DataType.TUPLE) { + schema = new ResourceSchema(fieldSchema.schema); + } else { + schema = null; + } + } + + public String getName() { + return name; + } + public ResourceFieldSchema setName(String name) { + this.name = name; + return this; + } + + public byte getType() { + return type; + } + public ResourceFieldSchema setType(byte type) { + this.type = type; + return this; + } + + public String getDescription() { + return description; + } + + public ResourceFieldSchema setDescription(String description) { + this.description = description; + return this; + } + + public ResourceSchema getSchema() { + return schema; + } + + public ResourceFieldSchema setSchema(ResourceSchema schema) { + this.schema = schema; + return this; + } } - public ResourceFieldSchema[] fields; + + public ResourceSchema() { + + } + + public ResourceSchema(Schema pigSchema) { + List<FieldSchema> pigSchemaFields = pigSchema.getFields(); + fields = new ResourceFieldSchema[pigSchemaFields.size()]; + for (int i=0; i<fields.length; i++) { + fields[i] = new ResourceFieldSchema(pigSchemaFields.get(i)); + } + + } + + public int getVersion() { + return version; + } + + public ResourceSchema setVersion(int version) { + this.version = version; + return this; + } + + public ResourceFieldSchema[] getFields() { + return fields; + } + + public String[] fieldNames() { + String[] names = new String[fields.length]; + for (int i=0; i<fields.length; i++) { + names[i] = fields[i].getName(); + } + return names; + } + + public ResourceSchema setFields(ResourceFieldSchema[] fields) { + if (fields != null) + this.fields = Arrays.copyOf(fields, fields.length); + return this; + } + + public int[] getSortKeys() { + return sortKeys; + } + public ResourceSchema setSortKeys(int[] sortKeys) { + if (sortKeys != null) + this.sortKeys = Arrays.copyOf(sortKeys, sortKeys.length); + return this; + } + + public Order[] getSortKeyOrders() { + return sortKeyOrders; + } - enum Order { ASCENDING, DESCENDING } - public int[] sortKeys; // each entry is an offset into the fields array. - public Order[] sortKeyOrders; + public ResourceSchema setSortKeyOrders(Order[] sortKeyOrders) { + if (sortKeyOrders != null) + this.sortKeyOrders = Arrays.copyOf(sortKeyOrders, sortKeyOrders.length); + return this; + } } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=887338&r1=887337&r2=887338&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Fri Dec 4 20:12:48 2009 @@ -24,9 +24,11 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -38,23 +40,32 @@ import org.apache.pig.FileInputLoadFunc; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; import org.apache.pig.PigException; import org.apache.pig.PigWarning; import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.datastorage.HDataStorage; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataReaderWriter; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.BinStorageInputFormat; import org.apache.pig.impl.io.BinStorageOutputFormat; import org.apache.pig.impl.io.BinStorageRecordReader; import org.apache.pig.impl.io.BinStorageRecordWriter; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.io.ReadToEndLoader; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.util.LogUtils; public class BinStorage extends FileInputLoadFunc -implements LoadCaster, StoreFunc { +implements LoadCaster, StoreFunc, LoadMetadata { public static final int RECORD_1 = 0x01; @@ -321,6 +332,7 @@ return new BinStorageInputFormat(); } + @Override public int hashCode() { return 42; } @@ -365,4 +377,58 @@ throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } + + @Override + public String[] getPartitionKeys(String location, Configuration conf) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ResourceSchema getSchema(String location, Configuration conf) + throws IOException { + Properties props = ConfigurationUtil.toProperties(conf); + // since local mode now is implemented as hadoop's local mode + // we can treat either local or hadoop mode as hadoop mode - hence + // we can use HDataStorage and FileLocalizer.openDFSFile below + HDataStorage storage = new HDataStorage(props); + if (!FileLocalizer.fileExists(location, storage)) { + // At compile time in batch mode, the file may not exist + // (such as intermediate file). Just return null - the + // same way as we would if we did not get a valid record + return null; + } + ReadToEndLoader loader = new ReadToEndLoader(this, conf, location, 0); + // get the first record from the input file + // and figure out the schema from the data in + // the first record + Tuple t = loader.getNext(); + if(t == null) { + // we couldn't get a valid record from the input + return null; + } + int numFields = t.size(); + Schema s = new Schema(); + for (int i = 0; i < numFields; i++) { + try { + s.add(DataType.determineFieldSchema(t.get(i))); + } catch (Exception e) { + int errCode = 2104; + String msg = "Error while determining schema of BinStorage data."; + throw new ExecException(msg, errCode, PigException.BUG, e); + } + } + return new ResourceSchema(s); + } + + @Override + public ResourceStatistics getStatistics(String location, Configuration conf) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void setParitionFilter(OperatorPlan plan) throws IOException { + throw new UnsupportedOperationException(); + } } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=887338&r1=887337&r2=887338&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Fri Dec 4 20:12:48 2009 @@ -850,6 +850,7 @@ * Make a deep copy of a schema. * @throws CloneNotSupportedException */ + @Override public Schema clone() throws CloneNotSupportedException { Schema s = new Schema(); @@ -1620,13 +1621,6 @@ return new Schema(fsList); } - private static Schema getPigSchema(ResourceFieldSchema rfSchema) - throws FrontendException { - return new Schema(new FieldSchema(rfSchema.name, - rfSchema.schema == null ? null : getPigSchema(rfSchema.schema), - rfSchema.type)); - } - } Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=887338&r1=887337&r2=887338&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Fri Dec 4 20:12:48 2009 @@ -32,8 +32,11 @@ import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.ResourceSchema; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.builtin.BinStorage; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; @@ -45,6 +48,8 @@ import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.test.utils.GenRandomData; import org.apache.pig.test.utils.TestHelper; @@ -61,6 +66,7 @@ String inputFileName; String outputFileName; + @Override @Before public void setUp() throws Exception { pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); @@ -70,6 +76,7 @@ ".txt"; } + @Override @After public void tearDown() throws Exception { Util.deleteFile(cluster, inputFileName); @@ -201,7 +208,41 @@ ++size; } } + @Test + public void testBinStorageGetSchema() throws IOException, ParseException { + String input[] = new String[] { "hello\t1\t10.1", "bye\t2\t20.2" }; + String inputFileName = "testGetSchema-input.txt"; + String outputFileName = "testGetSchema-output.txt"; + try { + Util.createInputFile(pig.getPigContext(), + inputFileName, input); + String query = "a = load '" + inputFileName + "' as (c:chararray, " + + "i:int,d:double);store a into '" + outputFileName + "' using " + + "BinStorage();"; + pig.setBatchOn(); + Util.registerMultiLineQuery(pig, query); + pig.executeBatch(); + ResourceSchema rs = new BinStorage().getSchema(outputFileName, + ConfigurationUtil.toConfiguration(pig.getPigContext(). + getProperties())); + Schema expectedSchema = Util.getSchemaFromString( + "c:chararray,i:int,d:double"); + Assert.assertTrue("Checking binstorage getSchema output", Schema.equals( + expectedSchema, Schema.getPigSchema(rs), true, true)); + } finally { + Util.deleteFile(pig.getPigContext(), inputFileName); + Util.deleteFile(pig.getPigContext(), outputFileName); + } + } + private static void randomizeBytes(byte[] data, int offset, int length) { + Random random = new Random(); + for(int i=offset + length - 1; i >= offset; --i) { + data[i] = (byte) random.nextInt(256); + } + } + + @Test public void testStoreRemoteRel() throws Exception { checkStorePath("test","/tmp/test"); Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java?rev=887338&r1=887337&r2=887338&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java Fri Dec 4 20:12:48 2009 @@ -20,10 +20,8 @@ import static java.util.regex.Matcher.quoteReplacement; import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.ByteArrayInputStream; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; @@ -32,7 +30,6 @@ import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.PrintWriter; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -41,6 +38,7 @@ import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -250,6 +248,11 @@ String[] inputData) throws IOException { FileSystem fs = miniCluster.getFileSystem(); + createInputFile(fs, fileName, inputData); + } + + static public void createInputFile(FileSystem fs, String fileName, + String[] inputData) throws IOException { if(fs.exists(new Path(fileName))) { throw new IOException("File " + fileName + " already exists on the minicluster"); } @@ -259,6 +262,7 @@ pw.println(inputData[i]); } pw.close(); + } /** @@ -296,7 +300,15 @@ fs.delete(new Path(fileName), true); } - /** + static public void deleteFile(PigContext pigContext, String fileName) + throws IOException { + Configuration conf = ConfigurationUtil.toConfiguration( + pigContext.getProperties()); + FileSystem fs = FileSystem.get(conf); + fs.delete(new Path(fileName), true); + } + + /** * Helper function to check if the result of a Pig Query is in line with * expected results. * @@ -489,4 +501,17 @@ } return(path.delete()); } + + /** + * @param pigContext + * @param fileName + * @param input + * @throws IOException + */ + public static void createInputFile(PigContext pigContext, + String fileName, String[] input) throws IOException { + Configuration conf = ConfigurationUtil.toConfiguration( + pigContext.getProperties()); + createInputFile(FileSystem.get(conf), fileName, input); + } }