Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java Wed Jan 20 20:08:28 2010 @@ -20,27 +20,13 @@ import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.pig.ExecType; -import org.apache.pig.experimental.JsonMetadata; -import org.apache.pig.experimental.LoadMetadata; -import org.apache.pig.experimental.StoreMetadata; -import org.apache.pig.experimental.ResourceSchema; -import org.apache.pig.experimental.ResourceStatistics; -import org.apache.pig.StoreConfig; -import org.apache.pig.backend.datastorage.DataStorage; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.backend.hadoop.datastorage.HDataStorage; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.Expression; +import org.apache.pig.LoadMetadata; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.StoreMetadata; import org.apache.pig.builtin.PigStorage; -import org.apache.pig.data.DataType; -import org.apache.pig.impl.io.FileLocalizer; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; /** * This Load/Store Func reads/writes metafiles that allow the schema and @@ -54,9 +40,7 @@ * Due to StoreFunc limitations, you can only write the metafiles in MapReduce * mode. You can read them in Local or MapReduce mode. */ -public class PigStorageSchema extends PigStorage implements StoreMetadata { - - private static final Log log = LogFactory.getLog(PigStorageSchema.class); +public class PigStorageSchema extends PigStorage implements LoadMetadata, StoreMetadata { public PigStorageSchema() { super(); @@ -65,61 +49,50 @@ public PigStorageSchema(String delim) { super(delim); } + + //------------------------------------------------------------------------ + // Implementation of LoadMetaData interface @Override - public Schema determineSchema(String fileName, ExecType execType, - DataStorage storage) throws IOException { + public ResourceSchema getSchema(String location, + Configuration conf) throws IOException { + return (new JsonMetadata()).getSchema(location, conf); + } + + @Override + public ResourceStatistics getStatistics(String location, + Configuration conf) throws IOException { + return null; + } - // TODO fullPath should be retrieved ia relativeToAbsolutePath once PIG-966 is complete - String fullPath = FileLocalizer.fullPath(fileName, storage); - LoadMetadata metadataLoader = new JsonMetadata(fullPath, storage); - ResourceSchema resourceSchema = metadataLoader.getSchema(fullPath, null); - if (resourceSchema == null) { - return null; - } - Schema pigSchema = new Schema(); - for (ResourceSchema.ResourceFieldSchema field : resourceSchema.getFields()) { - FieldSchema pigFieldSchema = DataType.determineFieldSchema(field); - // determineFieldSchema only sets the types. we also want the aliases. - // TODO this doesn't work properly for complex types - pigFieldSchema.alias = field.getName(); - pigSchema.add(pigFieldSchema); - } - log.info("Loaded Schema: "+pigSchema); - return pigSchema; + @Override + public void setPartitionFilter(Expression partitionFilter) + throws IOException { + } + + @Override + public String[] getPartitionKeys(String location, Configuration conf) + throws IOException { + return null; } + //------------------------------------------------------------------------ + // Implementation of StoreMetadata + @Override - public void finish() throws IOException { - super.finish(); - JobConf jobConf = PigMapReduce.sJobConf; - if(jobConf != null){ - StoreConfig storeConfig = MapRedUtil.getStoreConfig(jobConf); - DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(jobConf)); - Schema schema = storeConfig.getSchema(); - ResourceSchema resourceSchema = new ResourceSchema(schema); - JsonMetadata metadataWriter = new JsonMetadata(storeConfig.getLocation(), store); - metadataWriter.setFieldDel(fieldDel); - metadataWriter.setRecordDel(recordDel); - metadataWriter.setSchema(resourceSchema, storeConfig.getLocation(), null); - } - } - - /** - * @see org.apache.pig.experimental.StoreMetadata#setSchema(ResourceSchema) - * Does not do anything in this implementation. The finish() call writes the schema. - */ - @Override - public void setSchema(ResourceSchema schema, String location, Configuration conf) throws IOException { - // n\a - } - - /** - * @see org.apache.pig.experimental.StoreMetadata#setStatistics(ResourceStatistics) - * Does not do anything in this implementation. - */ + public void storeSchema(ResourceSchema schema, String location, + Configuration conf) throws IOException { + JsonMetadata metadataWriter = new JsonMetadata(); + byte fieldDel = '\t'; + byte recordDel = '\n'; + metadataWriter.setFieldDel(fieldDel); + metadataWriter.setRecordDel(recordDel); + metadataWriter.storeSchema(schema, location, conf); + } + @Override - public void setStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException { - // n\a + public void storeStatistics(ResourceStatistics stats, String location, + Configuration conf) throws IOException { + } }
Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java Wed Jan 20 20:08:28 2010 @@ -14,24 +14,23 @@ package org.apache.pig.piggybank.storage; import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.pig.ExecType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.pig.LoadFunc; - -import org.apache.pig.backend.datastorage.DataStorage; -import org.apache.pig.builtin.Utf8StorageConverter; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DefaultTupleFactory; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.io.BufferedPositionedInputStream; -import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.logicalLayer.FrontendException; /** * RegExLoader is an abstract class used to parse logs based on a regular expression. @@ -42,35 +41,32 @@ * Look to org.apache.pig.piggybank.storage.apachelog.CommonLogLoader for example usage. */ -public abstract class RegExLoader extends Utf8StorageConverter implements LoadFunc { - protected BufferedPositionedInputStream in = null; - long end = Long.MAX_VALUE; - private byte recordDel = (byte) '\n'; - private String fieldDel = "\t"; - final private static Charset utf8 = Charset.forName("UTF8"); - OutputStream os; - +public abstract class RegExLoader extends LoadFunc { + private LineRecordReader in = null; + abstract public Pattern getPattern(); - public RegExLoader() { - } - + @Override public Tuple getNext() throws IOException { - if (in == null || in.getPosition() > end) { + if (!in.nextKeyValue()) { return null; } + Pattern pattern = getPattern(); Matcher matcher = pattern.matcher(""); TupleFactory mTupleFactory = DefaultTupleFactory.getInstance(); String line; + boolean tryNext = true; while (tryNext) { - if ((line = in.readLine(utf8, recordDel)) == null) { + Text val = in.getCurrentValue(); + if (val == null) { break; } - if (line.length() > 0 && line.charAt(line.length() - 1) == '\r') + line = val.toString(); + if (line.length() > 0 && line.charAt(line.length() - 1) == '\r') { line = line.substring(0, line.length() - 1); - + } matcher = matcher.reset(line); ArrayList<DataByteArray> list = new ArrayList<DataByteArray>(); if (matcher.find()) { @@ -82,39 +78,25 @@ } } - return null; } - - public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException { - this.in = in; - this.end = end; - - // Since we are not block aligned we throw away the first - // record and could on a different instance to read it - if (offset != 0) { - getNext(); - } - } - - public void bindTo(OutputStream os) throws IOException { - this.os = os; - } - - public void putNext(Tuple f) throws IOException { - os.write((f.toDelimitedString(this.fieldDel) + (char) this.recordDel).getBytes("utf8")); - } - - public void finish() throws IOException { + + @SuppressWarnings("unchecked") + @Override + public InputFormat getInputFormat() throws IOException { + return new TextInputFormat(); } - public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException { - return null; + @SuppressWarnings("unchecked") + @Override + public void prepareToRead(RecordReader reader, PigSplit split) + throws IOException { + in = (LineRecordReader) reader; } @Override - public LoadFunc.RequiredFieldResponse fieldsToRead(LoadFunc.RequiredFieldList requiredFieldList) throws FrontendException { - return new LoadFunc.RequiredFieldResponse(false); + public void setLocation(String location, Job job) throws IOException { + FileInputFormat.setInputPaths(job, location); } } Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java Wed Jan 20 20:08:28 2010 @@ -20,40 +20,31 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.ArrayList; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.pig.impl.logicalLayer.FrontendException; - -import org.apache.pig.ExecType; -import org.apache.pig.LoadFunc; -import org.apache.pig.SamplableLoader; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; +import org.apache.pig.FileInputLoadFunc; import org.apache.pig.backend.BackendException; -import org.apache.pig.backend.datastorage.DataStorage; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.data.DataBag; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; -import org.apache.pig.impl.io.BufferedPositionedInputStream; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.schema.Schema; /** * A Loader for Hadoop-Standard SequenceFiles. @@ -61,10 +52,10 @@ * Text, IntWritable, LongWritable, FloatWritable, DoubleWritable, BooleanWritable, ByteWritable **/ -public class SequenceFileLoader implements LoadFunc, SamplableLoader { +public class SequenceFileLoader extends FileInputLoadFunc { - private SequenceFile.Reader reader; - private long end; + private SequenceFileRecordReader<Writable, Writable> reader; + private Writable key; private Writable value; private ArrayList<Object> mProtoTuple = null; @@ -73,74 +64,29 @@ protected TupleFactory mTupleFactory = TupleFactory.getInstance(); protected SerializationFactory serializationFactory; - protected byte keyType; - protected byte valType; + protected byte keyType = DataType.UNKNOWN; + protected byte valType = DataType.UNKNOWN; public SequenceFileLoader() { - + mProtoTuple = new ArrayList<Object>(2); } - - @Override - public void bindTo(String fileName, BufferedPositionedInputStream is, - long offset, long end) throws IOException { - - inferReader(fileName); - if (offset != 0) - reader.sync(offset); - - this.end = end; - - try { - this.key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), PigMapReduce.sJobConf); - this.value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), PigMapReduce.sJobConf); - } catch (ClassCastException e) { - throw new RuntimeException("SequenceFile contains non-Writable objects", e); - } - setKeyValueTypes(key.getClass(), value.getClass()); - } - - @Override - @SuppressWarnings("unchecked") - public Schema determineSchema(String fileName, ExecType execType, - DataStorage storage) throws IOException { - inferReader(fileName); - Class<Writable> keyClass = null; - Class<Writable> valClass= null; - try { - keyClass = (Class<Writable>) reader.getKeyClass(); - valClass = (Class<Writable>) reader.getValueClass(); - } catch (ClassCastException e) { - throw new RuntimeException("SequenceFile contains non-Writable objects", e); - } - Schema schema = new Schema(); - setKeyValueTypes(keyClass, valClass); - schema.add(new Schema.FieldSchema(null, keyType)); - schema.add(new Schema.FieldSchema(null, valType)); - return schema; - } - - protected void setKeyValueTypes(Class<?> keyClass, Class<?> valueClass) throws BackendException { + + protected void setKeyType(Class<?> keyClass) throws BackendException { this.keyType |= inferPigDataType(keyClass); - this.valType |= inferPigDataType(valueClass); if (keyType == DataType.ERROR) { LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype"); throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype"); } - if (valType == DataType.ERROR) { - LOG.warn("Unable to translate value "+value.getClass()+" to a Pig datatype"); - throw new BackendException("Unable to translate "+value.getClass()+" to a Pig datatype"); - } - - } - protected void inferReader(String fileName) throws IOException { - if (reader == null) { - Configuration conf = new Configuration(); - Path path = new Path(fileName); - FileSystem fs = FileSystem.get(path.toUri(), conf); - reader = new SequenceFile.Reader(fs, path, conf); - } } + protected void setValueType(Class<?> valueClass) throws BackendException { + this.valType |= inferPigDataType(valueClass); + if (keyType == DataType.ERROR) { + LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype"); + throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype"); + } + } + protected byte inferPigDataType(Type t) { if (t == DataByteArray.class) return DataType.BYTEARRAY; else if (t == Text.class) return DataType.CHARARRAY; @@ -169,77 +115,48 @@ } @Override - public LoadFunc.RequiredFieldResponse fieldsToRead(LoadFunc.RequiredFieldList requiredFieldList) throws FrontendException { - return new LoadFunc.RequiredFieldResponse(false); - } - - @Override public Tuple getNext() throws IOException { - if (mProtoTuple == null) mProtoTuple = new ArrayList<Object>(2); - if (reader != null && (reader.getPosition() < end || !reader.syncSeen()) && reader.next(key, value)) { - mProtoTuple.add(translateWritableToPigDataType(key, keyType)); - mProtoTuple.add(translateWritableToPigDataType(value, valType)); - Tuple t = mTupleFactory.newTuple(mProtoTuple); - mProtoTuple.clear(); - return t; + boolean next = false; + try { + next = reader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException(e); } - return null; - } - - @Override - public long getPosition() throws IOException { - return reader.getPosition(); - } - - @Override - public Tuple getSampledTuple() throws IOException { - return this.getNext(); - } - - @Override - public long skip(long n) throws IOException { - long startPos = reader.getPosition(); - reader.sync(startPos+n); - return reader.getPosition()-startPos; - } - - @Override - public DataBag bytesToBag(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); - } - - @Override - public String bytesToCharArray(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); - } - - @Override - public Double bytesToDouble(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); - } - - @Override - public Float bytesToFloat(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); - } - - @Override - public Integer bytesToInteger(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); + + if (!next) return null; + + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + + if (keyType == DataType.UNKNOWN && key != null) { + setKeyType(key.getClass()); + } + if (valType == DataType.UNKNOWN && value != null) { + setValueType(value.getClass()); + } + + mProtoTuple.add(translateWritableToPigDataType(key, keyType)); + mProtoTuple.add(translateWritableToPigDataType(value, valType)); + Tuple t = mTupleFactory.newTuple(mProtoTuple); + mProtoTuple.clear(); + return t; } + @SuppressWarnings("unchecked") @Override - public Long bytesToLong(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); + public InputFormat getInputFormat() throws IOException { + return new SequenceFileInputFormat<Writable, Writable>(); } + @SuppressWarnings("unchecked") @Override - public Map<String, Object> bytesToMap(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); + public void prepareToRead(RecordReader reader, PigSplit split) + throws IOException { + this.reader = (SequenceFileRecordReader) reader; } @Override - public Tuple bytesToTuple(byte[] b) throws IOException { - throw new FrontendException("SequenceFileLoader does not expect to cast data."); + public void setLocation(String location, Job job) throws IOException { + FileInputFormat.setInputPaths(job, location); } } Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java Wed Jan 20 20:08:28 2010 @@ -13,21 +13,15 @@ package org.apache.pig.piggybank.test.storage; -import static org.apache.pig.ExecType.LOCAL; - -import java.io.InputStream; import java.util.ArrayList; import java.util.Iterator; import junit.framework.TestCase; -import org.apache.pig.PigServer; - import org.apache.pig.ExecType; +import org.apache.pig.PigServer; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.BufferedPositionedInputStream; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.piggybank.storage.apachelog.CombinedLogLoader; import org.junit.Test; @@ -84,27 +78,6 @@ } @Test - public void testLoadFromBindTo() throws Exception { - String filename = TestHelper.createTempFile(data, " "); - CombinedLogLoader combinedLogLoader = new CombinedLogLoader(); - PigServer pigServer = new PigServer(LOCAL); - InputStream inputStream = FileLocalizer.open(filename, pigServer.getPigContext()); - combinedLogLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE); - - int tupleCount = 0; - - while (true) { - Tuple tuple = combinedLogLoader.getNext(); - if (tuple == null) - break; - else { - TestHelper.examineTuple(EXPECTED, tuple, tupleCount); - tupleCount++; - } - } - assertEquals(data.size(), tupleCount); - } - public void testLoadFromPigServer() throws Exception { String filename = TestHelper.createTempFile(data, " "); PigServer pig = new PigServer(ExecType.LOCAL); Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java Wed Jan 20 20:08:28 2010 @@ -13,21 +13,15 @@ package org.apache.pig.piggybank.test.storage; -import static org.apache.pig.ExecType.LOCAL; - -import java.io.InputStream; import java.util.ArrayList; import java.util.Iterator; import junit.framework.TestCase; -import org.apache.pig.PigServer; - import org.apache.pig.ExecType; +import org.apache.pig.PigServer; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.BufferedPositionedInputStream; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.piggybank.storage.apachelog.CommonLogLoader; import org.junit.Test; @@ -75,28 +69,6 @@ } @Test - public void testLoadFromBindTo() throws Exception { - String filename = TestHelper.createTempFile(data, " "); - CommonLogLoader commonLogLoader = new CommonLogLoader(); - PigServer pigServer = new PigServer(LOCAL); - - InputStream inputStream = FileLocalizer.open(filename, pigServer.getPigContext()); - commonLogLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE); - - int tupleCount = 0; - - while (true) { - Tuple tuple = commonLogLoader.getNext(); - if (tuple == null) - break; - else { - TestHelper.examineTuple(EXPECTED, tuple, tupleCount); - tupleCount++; - } - } - assertEquals(data.size(), tupleCount); - } - public void testLoadFromPigServer() throws Exception { String filename = TestHelper.createTempFile(data, " "); PigServer pig = new PigServer(ExecType.LOCAL); Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java Wed Jan 20 20:08:28 2010 @@ -150,7 +150,7 @@ .getLocal(new Configuration()) : cluster.getFileSystem()); Path output = new Path(outPath); Assert.assertTrue("Output dir does not exists!", fs.exists(output) - && fs.isDirectory(output)); + && fs.getFileStatus(output).isDir()); Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter)); Assert.assertTrue("Split field dirs not found!", paths != null); @@ -161,19 +161,26 @@ Assert.assertTrue("No files found for path: " + path.toUri().getPath(), files != null); for (Path filePath : files) { - if (fs.isFile(filePath)) { - BufferedReader reader = new BufferedReader(new InputStreamReader(fs - .open(filePath))); - String line = ""; - while ((line = reader.readLine()) != null) { - String[] fields = line.split("\\t"); - Assert.assertEquals(fields.length, 3); - Assert.assertEquals("Unexpected field value in the output record", + Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath)); + + BufferedReader reader = new BufferedReader(new InputStreamReader(fs + .open(filePath))); + String line = ""; + int count = 0; + while ((line = reader.readLine()) != null) { + String[] fields = line.split("\\t"); + Assert.assertEquals(fields.length, 3); + Assert.assertEquals("Unexpected field value in the output record", splitField, fields[1]); - } - reader.close(); - } + count++; + System.out.println("field: " + fields[1]); + } + reader.close(); + Assert.assertEquals(count, 3); } } } } + + + Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java Wed Jan 20 20:08:28 2010 @@ -15,8 +15,8 @@ import static org.apache.pig.ExecType.LOCAL; -import java.io.InputStream; import java.util.ArrayList; +import java.util.Iterator; import java.util.regex.Pattern; import junit.framework.TestCase; @@ -24,21 +24,21 @@ import org.apache.pig.PigServer; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.io.BufferedPositionedInputStream; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.piggybank.storage.RegExLoader; +import org.apache.pig.test.Util; import org.junit.Test; public class TestRegExLoader extends TestCase { private static String patternString = "(\\w+),(\\w+);(\\w+)"; private final static Pattern pattern = Pattern.compile(patternString); - class DummyRegExLoader extends RegExLoader { + public static class DummyRegExLoader extends RegExLoader { + public DummyRegExLoader() {} + @Override public Pattern getPattern() { return Pattern.compile(patternString); } - } public static ArrayList<String[]> data = new ArrayList<String[]>(); @@ -49,32 +49,26 @@ } @Test - public void testLoadFromBindTo() throws Exception { - //String filename = TestHelper.createTempFile(data, " "); - //System.err.println(filename); - DummyRegExLoader dummyRegExLoader = new DummyRegExLoader(); + public void testLoadFromBindTo() throws Exception { PigServer pigServer = new PigServer(LOCAL); String filename = TestHelper.createTempFile(data, ""); - /*org.apache.pig.test.Util.createInputFile("tmp", "", - new String[]{"1,one;i", "2,two;ii", "3,three;iii"} - ); - - String filename = input.getAbsolutePath(); - */ - InputStream inputStream = FileLocalizer.open(filename, pigServer.getPigContext()); - dummyRegExLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE); ArrayList<DataByteArray[]> expected = TestHelper.getExpected(data, pattern); + + pigServer.registerQuery("A = LOAD 'file:" + Util.encodeEscape(filename) + + "' USING " + DummyRegExLoader.class.getName() + "() AS (key, val);"); + Iterator<?> it = pigServer.openIterator("A"); int tupleCount = 0; - while (true) { - Tuple tuple = dummyRegExLoader.getNext(); + while (it.hasNext()) { + Tuple tuple = (Tuple) it.next(); if (tuple == null) - break; + break; else { - TestHelper.examineTuple(expected, tuple, tupleCount); - tupleCount++; + TestHelper.examineTuple(expected, tuple, tupleCount); + tupleCount++; } - } + } assertEquals(data.size(), tupleCount); } + } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java Wed Jan 20 20:08:28 2010 @@ -20,7 +20,6 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.pig.impl.plan.OperatorPlan; /** * This interface defines how to retrieve metadata related to data to be loaded. Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java Wed Jan 20 20:08:28 2010 @@ -17,28 +17,241 @@ */ package org.apache.pig; -public class ResourceStatistics { +import java.io.Serializable; +import java.util.Arrays; - public static class ResourceFieldStatistics { +public class ResourceStatistics implements Cloneable { - int version; + /* Getters intentionally return mutable arrays instead of copies, + * to simplify updates without unnecessary copying. + * Setters make a copy of the arrays in order to prevent an array + * from being shared by two objects, with modifications in one + * accidentally changing the other. + */ + + // arrays are initialized to empty so we don't have to worry about NPEs + // setters disallow setting them to null. + + private static final long serialVersionUID = 1L; + public Long mBytes; // size in megabytes + public Long numRecords; // number of records + public Long avgRecordSize; + public ResourceFieldStatistics[] fields = new ResourceFieldStatistics[0]; - enum Distribution {UNIFORM, NORMAL, POWER}; + public static class ResourceFieldStatistics implements Serializable { - public long numDistinctValues; // number of distinct values represented in this field - public Distribution distribution; // how values in this field are distributed + public static final long serialVersionUID = 1L; + + public int version; + + public Long numDistinctValues; // number of distinct values represented in this field // We need some way to represent a histogram of values in the field, // as those will be useful. However, we can't count on being // able to hold such histograms in memory. Have to figure out // how they can be kept on disk and represented here. - // Probably more in here + // for now.. don't create so many buckets you can't hold them in memory + + // an ordered array of the most common values, + // in descending order of frequency + public Object[] mostCommonValues = new Object[0]; + + // an array that matches the mostCommonValues array, and lists + // the frequencies of those values as a fraction (0 through 1) of + // the total number of records + public float[] mostCommonValuesFreq = new float[0]; + + // an ordered array of values, from min val to max val + // such that the number of records with values + // between valueHistogram[i] and and valueHistogram[i+1] is + // roughly equal for all values of i. + // NOTE: if mostCommonValues is non-empty, the values in that array + // should not be included in the histogram. Adjust accordingly. + public Object[] valueHistogram = new Object[0]; + + + public int getVersion() { + return version; + } + + public ResourceFieldStatistics setVersion(int version) { + this.version = version; + return this; + } + + public Long getNumDistinctValues() { + return numDistinctValues; + } + + public ResourceFieldStatistics setNumDistinctValues(Long numDistinctValues) { + this.numDistinctValues = numDistinctValues; + return this; + } + + public Object[] getMostCommonValues() { + return mostCommonValues; + } + + public ResourceFieldStatistics setMostCommonValues(Object[] mostCommonValues) { + if (mostCommonValues !=null) + this.mostCommonValues = + Arrays.copyOf(mostCommonValues, mostCommonValues.length); + return this; + } + + public float[] getMostCommonValuesFreq() { + return mostCommonValuesFreq; + } + + public ResourceFieldStatistics setMostCommonValuesFreq(float[] mostCommonValuesFreq) { + if (mostCommonValuesFreq != null) + this.mostCommonValuesFreq = + Arrays.copyOf(mostCommonValuesFreq, mostCommonValuesFreq.length); + return this; + } + + public Object[] getValueHistogram() { + return valueHistogram; + } + + public ResourceFieldStatistics setValueHistogram(Object[] valueHistogram) { + if (valueHistogram != null) + this.valueHistogram = Arrays.copyOf(valueHistogram, valueHistogram.length); + return this; + } + + + /* + * equals() and hashCode() overridden mostly for ease of testing + * you shouldn't encounter a situation in which you need to .equals() + * two sets of statistics on different objects "in the wild" + */ + @Override + public boolean equals(Object anOther) { + if (anOther == null || !(anOther.getClass().equals(this.getClass()))) + return false; + ResourceFieldStatistics other = (ResourceFieldStatistics) anOther; + // setters do not allow null values, so no worries about NPEs here + return (Arrays.equals(mostCommonValues, other.mostCommonValues) && + Arrays.equals(mostCommonValuesFreq, other.mostCommonValuesFreq) && + Arrays.equals(valueHistogram, other.valueHistogram) && + this.numDistinctValues.equals(other.numDistinctValues) && + this.version == other.version + ); + } + + /** + * A naive hashCode implementation following the example in IBM's developerworks: + * http://www.ibm.com/developerworks/java/library/j-jtp05273.html + */ + @Override + public int hashCode() { + int hash = 1; + hash = 31 * hash + Arrays.hashCode(mostCommonValues); + hash = 31 * hash + Arrays.hashCode(mostCommonValuesFreq); + hash = 31 * hash + numDistinctValues.hashCode(); + hash = 31 * hash + Arrays.hashCode(valueHistogram); + hash = 31 * hash + version; + return 0; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ResourceStatistics. Version: "+version+"\n"); + sb.append("MCV:\n"); + for (Object o : mostCommonValues) sb.append('['+ o.toString() +']'); + sb.append("\n MCVfreq:\n"); + for (Float f : mostCommonValuesFreq) sb.append('['+f.toString()+']'); + sb.append("\n"); + sb.append("numDistVals: "+numDistinctValues); + sb.append("valHistogram: \n"); + for (Object o : valueHistogram) sb.append('['+o.toString()+']'); + sb.append("\n"); + return sb.toString(); + } + } + + + public Long getmBytes() { + return mBytes; + } + public ResourceStatistics setmBytes(Long mBytes) { + this.mBytes = mBytes; + return this; + } + public Long getNumRecords() { + return numRecords; + } + public ResourceStatistics setNumRecords(Long numRecords) { + this.numRecords = numRecords; + return this; + } + + /* + * returns average record size. This number can be explicitly specified by statistics, or + * if absent, computed using totalbytes/totalrecords. Will return null if can't be computed. + */ + public Long getAvgRecordSize() { + if (avgRecordSize == null && (mBytes != null && numRecords != null)) + return mBytes / numRecords; + else + return avgRecordSize; + } + + public void setAvgRecordSize(Long size) { + avgRecordSize = size; + } + + public ResourceFieldStatistics[] getFields() { + return fields; + } + + public ResourceStatistics setFields(ResourceFieldStatistics[] fields) { + if (fields != null) + this.fields = Arrays.copyOf(fields, fields.length); + return this; } - public long mBytes; // size in megabytes - public long numRecords; // number of records - public ResourceFieldStatistics[] fields; + /* + * equals() and hashCode() overridden mostly for ease of testing + * you shouldn't encounter a situation in which you need to .equals() + * two sets of statistics on different objects "in the wild" + */ + @Override + public boolean equals(Object anOther) { + if (anOther == null || !(anOther.getClass().equals(this.getClass()))) + return false; + ResourceStatistics other = (ResourceStatistics) anOther; + return (Arrays.equals(fields, other.fields) && + ((mBytes==null) + ? (other.mBytes==null) : mBytes.equals(other.mBytes)) && + ((numRecords == null) + ? (other.numRecords==null) : numRecords.equals(other.numRecords)) + ); + } + + @Override + public int hashCode() { + int hash = 1; + hash = 31*hash + Arrays.hashCode(fields); + hash = 31*hash + (mBytes == null ? 0 : mBytes.hashCode()); + hash = 31*hash + (numRecords == null ? 0 : numRecords.hashCode()); + return hash; + } // Probably more in here + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("Field Stats: \n"); + for (ResourceFieldStatistics f : fields) sb.append(f.toString()); + sb.append("mBytes: "+mBytes); + sb.append("numRecords: "+numRecords); + return sb.toString(); + } + + public Object clone() throws CloneNotSupportedException { + return super.clone(); + } } \ No newline at end of file Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java Wed Jan 20 20:08:28 2010 @@ -15,8 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pig; - +package org.apache.pig; + import java.io.IOException; import org.apache.hadoop.fs.Path; @@ -24,14 +24,15 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.pig.data.Tuple; - - -/** -* This interface is used to implement functions to write records -* from a dataset. -* -* -*/ +import org.apache.pig.impl.util.UDFContext; + + +/** +* This interface is used to implement functions to write records +* from a dataset. +* +* +*/ public interface StoreFunc { @@ -118,4 +119,4 @@ * @param signature a unique signature to identify this StoreFunc */ public void setStoreFuncUDFContextSignature(String signature); -} +} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Wed Jan 20 20:08:28 2010 @@ -22,13 +22,18 @@ import java.util.LinkedList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; +import org.apache.pig.StoreMetadata; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; @@ -65,7 +70,6 @@ */ public PigOutputCommitter(TaskAttemptContext context) throws IOException { - // create and store the map and reduce output committers mapOutputCommitters = getCommitters(context, JobControlCompiler.PIG_MAP_STORES); @@ -148,21 +152,36 @@ return contextCopy; } + private void storeCleanup(POStore store, Configuration conf) + throws IOException { + StoreFunc storeFunc = store.getStoreFunc(); + if (storeFunc instanceof StoreMetadata) { + Schema schema = store.getSchema(); + if (schema != null) { + ((StoreMetadata) storeFunc).storeSchema( + new ResourceSchema(schema), store.getSFile() + .getFileName(), conf); + } + } + } + /* (non-Javadoc) * @see org.apache.hadoop.mapred.FileOutputCommitter#cleanupJob(org.apache.hadoop.mapred.JobContext) */ @Override public void cleanupJob(JobContext context) throws IOException { // call clean up on all map and reduce committers - for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) { + for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) { JobContext updatedContext = setUpContext(context, mapCommitter.second); + storeCleanup(mapCommitter.second, context.getConfiguration()); mapCommitter.first.cleanupJob(updatedContext); } for (Pair<OutputCommitter, POStore> reduceCommitter : - reduceOutputCommitters) { + reduceOutputCommitters) { JobContext updatedContext = setUpContext(context, reduceCommitter.second); + storeCleanup(reduceCommitter.second, context.getConfiguration()); reduceCommitter.first.cleanupJob(updatedContext); } @@ -172,7 +191,7 @@ * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override - public void abortTask(TaskAttemptContext context) throws IOException { + public void abortTask(TaskAttemptContext context) throws IOException { if(context.getTaskAttemptID().isMap()) { for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) { Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Wed Jan 20 20:08:28 2010 @@ -371,7 +371,7 @@ @Override public void checkSchema(ResourceSchema s) throws IOException { - throw new UnsupportedOperationException(); + } @Override Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java?rev=901360&r1=901359&r2=901360&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java Wed Jan 20 20:08:28 2010 @@ -28,7 +28,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.pig.PigException; -import org.apache.pig.experimental.ResourceSchema; +import org.apache.pig.ResourceSchema; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema;