Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java Tue Oct 27 01:13:19 2009 @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Map; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.pig.ExecType; import org.apache.pig.LoadFunc; import org.apache.pig.SamplableLoader; @@ -39,16 +40,18 @@ * Abstract class that specifies the interface for sample loaders * */ +//XXX : FIXME - make this work with new load-store redesign public abstract class SampleLoader implements LoadFunc { protected int numSamples; protected long skipInterval; - protected SamplableLoader loader; + protected LoadFunc loader; private TupleFactory factory; + private boolean initialized = false; public SampleLoader(String funcSpec) { - loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec); + loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec); } public void setNumSamples(int n) { @@ -59,133 +62,21 @@ return numSamples; } - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, org.apache.pig.impl.io.BufferedPositionedInputStream, long, long) - */ - public void bindTo(String fileName, BufferedPositionedInputStream is, - long offset, long end) throws IOException { - skipInterval = (end - offset)/numSamples; - loader.bindTo(fileName, is, offset, end); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToBag(byte[]) - */ - public DataBag bytesToBag(byte[] b) throws IOException { - return loader.bytesToBag(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[]) - */ - public String bytesToCharArray(byte[] b) throws IOException { - return loader.bytesToCharArray(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToDouble(byte[]) - */ - public Double bytesToDouble(byte[] b) throws IOException { - return loader.bytesToDouble(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToFloat(byte[]) - */ - public Float bytesToFloat(byte[] b) throws IOException { - return loader.bytesToFloat(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToInteger(byte[]) - */ - public Integer bytesToInteger(byte[] b) throws IOException { - return loader.bytesToInteger(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToLong(byte[]) - */ - public Long bytesToLong(byte[] b) throws IOException { - return loader.bytesToLong(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToMap(byte[]) - */ - public Map<String, Object> bytesToMap(byte[] b) throws IOException { - return loader.bytesToMap(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#bytesToTuple(byte[]) - */ - public Tuple bytesToTuple(byte[] b) throws IOException { - return loader.bytesToTuple(b); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage) - */ - public Schema determineSchema(String fileName, ExecType execType, - DataStorage storage) throws IOException { - return loader.determineSchema(fileName, execType, storage); - } - - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema) - */ - public void fieldsToRead(Schema schema) { - loader.fieldsToRead(schema); - } + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getInputFormat() + */ + @Override + public InputFormat getInputFormat() { + return loader.getInputFormat(); + } - /* (non-Javadoc) + /* (non-Javadoc) * @see org.apache.pig.LoadFunc#getNext() */ public Tuple getNext() throws IOException { - long initialPos = loader.getPosition(); - - // we move to next boundry - Tuple t = loader.getSampledTuple(); - long finalPos = loader.getPosition(); - - long toSkip = skipInterval - (finalPos - initialPos); - if (toSkip > 0) { - long rc = loader.skip(toSkip); - - // if we did not skip enough - // in the first attempt, call - // in.skip() repeatedly till we - // skip enough - long remainingSkip = toSkip - rc; - while(remainingSkip > 0) { - rc = loader.skip(remainingSkip); - if(rc == 0) { - // underlying stream saw EOF - break; - } - remainingSkip -= rc; - } - } - - if (t == null) { - return null; - } - - if (factory == null) { - factory = TupleFactory.getInstance(); - } - - // copy existing field - Tuple m = factory.newTuple(t.size()+1); - for(int i=0; i<t.size(); i++) { - m.set(i, t.get(i)); - } - - // add size of the tuple at the end - m.set(t.size(), (finalPos-initialPos) + 1); // offset 1 for null - - return m; + // estimate how many tuples there are in the map + // based on the + return null; } public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, PigContext pc) throws ExecException {
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java?rev=830041&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java Tue Oct 27 01:13:19 2009 @@ -0,0 +1,30 @@ +/** + * + */ +package org.apache.pig.impl.io; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.pig.data.Tuple; + +/** + * + */ +public class BinStorageInputFormat extends FileInputFormat<Text, Tuple> { + + /* (non-Javadoc) + * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext) + */ + @Override + public RecordReader<Text, Tuple> createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + return new BinStorageRecordReader(); + } + +} Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java?rev=830041&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java Tue Oct 27 01:13:19 2009 @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.io; + +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataReaderWriter; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +/** + * Treats keys as offset in file and value as line. + */ +public class BinStorageRecordReader extends RecordReader<Text, Tuple> { + + private long start; + private long pos; + private long end; + private BufferedPositionedInputStream in; + private Tuple value = null; + public static final int RECORD_1 = 0x01; + public static final int RECORD_2 = 0x02; + public static final int RECORD_3 = 0x03; + private DataInputStream inData = null; + + public void initialize(InputSplit genericSplit, + TaskAttemptContext context) throws IOException { + FileSplit split = (FileSplit) genericSplit; + Configuration job = context.getConfiguration(); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(split.getPath()); + if (start != 0) { + fileIn.seek(start); + } + in = new BufferedPositionedInputStream(fileIn, start); + inData = new DataInputStream(in); + } + + public boolean nextKeyValue() throws IOException { + int b = 0; + // skip to next record + while (true) { + if (in == null || in.getPosition() >=end) { + return false; + } + // check if we saw RECORD_1 in our last attempt + // this can happen if we have the following + // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3 + // After reading the second RECORD_1 in the above + // sequence, we should not look for RECORD_1 again + if(b != RECORD_1) { + b = in.read(); + if(b != RECORD_1 && b != -1) { + continue; + } + if(b == -1) return false; + } + b = in.read(); + if(b != RECORD_2 && b != -1) { + continue; + } + if(b == -1) return false; + b = in.read(); + if(b != RECORD_3 && b != -1) { + continue; + } + if(b == -1) return false; + b = in.read(); + if(b != DataType.TUPLE && b != -1) { + continue; + } + if(b == -1) return false; + break; + } + try { + // if we got here, we have seen RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER + // sequence - lets now read the contents of the tuple + value = (Tuple)DataReaderWriter.readDatum(inData, DataType.TUPLE); + return true; + } catch (ExecException ee) { + throw ee; + } + + } + + @Override + public Text getCurrentKey() { + // the key is always null since we don't really have a key for each + // input record + return null; + } + + @Override + public Tuple getCurrentValue() { + return value; + } + + /** + * Get the progress within the split + */ + public float getProgress() { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (pos - start) / (float)(end - start)); + } + } + + public synchronized void close() throws IOException { + if (in != null) { + in.close(); + } + } +} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java Tue Oct 27 01:13:19 2009 @@ -152,15 +152,7 @@ * @throws IOException */ public static InputStream openDFSFile(String fileName) throws IOException { - SliceWrapper wrapper = PigInputFormat.getActiveSplit(); - - Configuration conf = null; - if (wrapper == null) { - conf = PigMapReduce.sJobConf; - }else{ - conf = wrapper.getJobConf(); - } - + Configuration conf = PigMapReduce.sJobConf; if (conf == null) { throw new RuntimeException( "can't open DFS file while executing locally"); @@ -177,14 +169,7 @@ } public static long getSize(String fileName) throws IOException { - SliceWrapper wrapper = PigInputFormat.getActiveSplit(); - - Configuration conf = null; - if (wrapper == null) { - conf = PigMapReduce.sJobConf; - }else{ - conf = wrapper.getJobConf(); - } + Configuration conf = PigMapReduce.sJobConf; if (conf == null) { throw new RuntimeException( Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java Tue Oct 27 01:13:19 2009 @@ -31,6 +31,8 @@ import org.apache.pig.impl.io.FileLocalizer; +// XXX: FIXME: make this work with load store redesign + public class PigFile { private String file = null; boolean append = false; @@ -47,7 +49,8 @@ public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException { DataBag content = BagFactory.getInstance().newDefaultBag(); InputStream is = FileLocalizer.open(file, pigContext); - lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); + //XXX FIXME: make this work with new load-store redesign +// lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); Tuple f = null; while ((f = lfunc.getNext()) != null) { content.add(f); @@ -58,12 +61,12 @@ public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException { BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext)); - sfunc.bindTo(bos); +// sfunc.bindTo(bos); for (Iterator<Tuple> it = data.iterator(); it.hasNext();) { Tuple row = it.next(); sfunc.putNext(row); } - sfunc.finish(); +// sfunc.finish(); bos.close(); } Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=830041&view=auto ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java (added) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java Tue Oct 27 01:13:19 2009 @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.pig.LoadCaster; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.OperatorKey; + +/** + * This is wrapper Loader which wraps a real LoadFunc underneath and allows + * to read a file completely starting a given split (indicated by a split index + * which is used to look in the List<InputSplit> returned by the underlying + * InputFormat's getSplits() method). So if the supplied split index is 0, this + * loader will read the entire file. If it is non zero it will read the partial + * file beginning from that split to the last split. + * + * The call sequence to use this is: + * 1) construct an object using the constructor + * 2) Call getNext() in a loop till it returns null + */ +public class ReadToEndLoader implements LoadFunc { + + /** + * the wrapped LoadFunc which will do the actual reading + */ + private LoadFunc wrappedLoadFunc; + + /** + * the Configuration object used to locate the input location - this will + * be used to call {...@link LoadFunc#setLocation(String, Configuration)} on + * the wrappedLoadFunc + */ + private Configuration conf; + + /** + * the input location string (typically input file/dir name ) + */ + private String inputLocation; + + /** + * the index of the split (in {...@link InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}) + * to start reading from + */ + private int startSplitIndex; + + /** + * the index of the split the loader is currently reading from + */ + private int curSplitIndex; + + /** + * the input splits returned by underlying {...@link InputFormat#getSplits(JobContext)} + */ + private List<InputSplit> splits; + + /** + * underlying RecordReader + */ + private RecordReader reader; + + /** + * underlying InputFormat + */ + private InputFormat inputFormat; + + /** + * @param wrappedLoadFunc + * @param conf + * @param inputLocation + * @param splitIndex + * @throws IOException + * @throws InterruptedException + */ + public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf, + String inputLocation, int splitIndex) throws IOException { + this.wrappedLoadFunc = wrappedLoadFunc; + // make a copy so that if the underlying InputFormat writes to the + // conf, we don't affect the caller's copy + this.conf = new Configuration(conf); + this.inputLocation = inputLocation; + this.startSplitIndex = splitIndex; + this.curSplitIndex = startSplitIndex; + + // let's initialize the wrappedLoadFunc + Job job = new Job(this.conf); + wrappedLoadFunc.setLocation(this.inputLocation, + job); + // The above setLocation call could write to the conf within + // the job - get a hold of the modified conf + this.conf = job.getConfiguration(); + inputFormat = wrappedLoadFunc.getInputFormat(); + try { + splits = inputFormat.getSplits(new JobContext(this.conf, + new JobID())); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private boolean initializeReader() throws IOException, + InterruptedException { + if(curSplitIndex > splits.size() - 1) { + // past the last split, we are done + return false; + } + + InputSplit curSplit = splits.get(curSplitIndex); + TaskAttemptContext tAContext = new TaskAttemptContext(conf, + new TaskAttemptID()); + reader = inputFormat.createRecordReader(curSplit, tAContext); + reader.initialize(curSplit, tAContext); + // create a dummy pigsplit - other than the actual split, the other + // params are really not needed here where we are just reading the + // input completely + PigSplit pigSplit = new PigSplit(curSplit, -1, + new ArrayList<OperatorKey>(), -1); + wrappedLoadFunc.prepareToRead(reader, pigSplit); + return true; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getNext() + */ + public Tuple getNext() throws IOException { + try { + Tuple t = null; + if(reader == null) { + // first call + return getNextHelper(); + } else { + // we already have a reader initialized + t = wrappedLoadFunc.getNext(); + if(t != null) { + return t; + } + // if loadfunc returned null, we need to read next split + // if there is one + curSplitIndex++; + return getNextHelper(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private Tuple getNextHelper() throws IOException, InterruptedException { + Tuple t = null; + while(initializeReader()) { + t = wrappedLoadFunc.getNext(); + if(t == null) { + // try next split + curSplitIndex++; + } else { + return t; + } + } + // we processed all splits - we are done + wrappedLoadFunc.doneReading(); + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#doneReading() + */ + @Override + public void doneReading() { + throw new RuntimeException("Internal Error: Unimplemented method called!"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getInputFormat() + */ + @Override + public InputFormat getInputFormat() { + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#getLoadCaster() + */ + @Override + public LoadCaster getLoadCaster() { + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit) + */ + @Override + public void prepareToRead(RecordReader reader, PigSplit split) { + throw new RuntimeException("Internal Error: Unimplemented method called!"); + } + + /* (non-Javadoc) + * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, org.apache.hadoop.mapreduce.Job) + */ + @Override + public void setLocation(String location, Job job) throws IOException { + throw new RuntimeException("Internal Error: Unimplemented method called!"); + } + +} Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Oct 27 01:13:19 2009 @@ -27,8 +27,11 @@ import java.util.Set; import org.apache.pig.ExecType; import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; import org.apache.pig.PigException; +import org.apache.pig.ResourceSchema; import org.apache.pig.backend.datastorage.DataStorage; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; @@ -38,12 +41,14 @@ import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.MultiMap; import org.apache.pig.impl.util.Pair; +import org.apache.pig.impl.util.PropertiesUtil; import org.apache.pig.impl.util.WrappedIOException; import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.Job; public class LOLoad extends RelationalOperator { private static final long serialVersionUID = 2L; @@ -145,8 +150,8 @@ } if(null == mDeterminedSchema) { - mSchema = mLoadFunc.determineSchema(mSchemaFile, mExecType, mStorage); - mDeterminedSchema = mSchema; + mSchema = determineSchema(); + mDeterminedSchema = mSchema; } mIsSchemaComputed = true; } catch (IOException ioe) { @@ -161,6 +166,22 @@ return mSchema; } + private Schema determineSchema() throws IOException { + if(LoadMetadata.class.isAssignableFrom(mLoadFunc.getClass())) { + // XXX: FIXME - mStorage should no longer be needed, we + // should use Configuration directly by passing a + // Configuration object while creating LOLoad rather than + // a DataStorage object + mLoadFunc.setLocation(mInputFileSpec.getFileName(), + new Job(ConfigurationUtil.toConfiguration( + mStorage.getConfiguration()))); + LoadMetadata loadMetadata = (LoadMetadata)mLoadFunc; + ResourceSchema rSchema = loadMetadata.getSchema(); + return Schema.getPigSchema(rSchema); + } else { + return null; + } + } /* (non-Javadoc) * @see org.apache.pig.impl.logicalLayer.LogicalOperator#setSchema(org.apache.pig.impl.logicalLayer.schema.Schema) */ @@ -253,7 +274,7 @@ } } else { try { - inputSchema = mLoadFunc.determineSchema(mSchemaFile, mExecType, mStorage); + inputSchema = determineSchema(); } catch (IOException ioe) { mProjectionMap = null; return mProjectionMap; Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Oct 27 01:13:19 2009 @@ -28,6 +28,8 @@ import java.util.Collection; import org.apache.pig.PigException; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.data.DataType; //import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.commons.logging.Log; @@ -672,7 +674,7 @@ public FieldSchema getField(String alias) throws FrontendException { FieldSchema fs = mAliases.get(alias); if(null == fs) { - String cocoPrefix = "::" + alias; + String cocoPrefix = new String("::" + alias); Map<String, Integer> aliasMatches = new HashMap<String, Integer>(); //build the map of aliases that have cocoPrefix as the suffix for(String key: mAliases.keySet()) { @@ -798,7 +800,7 @@ if (aliases != null) { List<String> listAliases = new ArrayList<String>(); for(String alias: aliases) { - listAliases.add(alias); + listAliases.add(new String(alias)); } for(String alias: listAliases) { log.debug("Removing alias " + alias + " from multimap"); @@ -1597,6 +1599,34 @@ this.twoLevelAccessRequired = twoLevelAccess; } + public static Schema getPigSchema(ResourceSchema rSchema) + throws FrontendException { + List<FieldSchema> fsList = new ArrayList<FieldSchema>(); + for(ResourceFieldSchema rfs : rSchema.fields) { + FieldSchema fs = new FieldSchema(rfs.name, rfs.schema == null ? null: + getPigSchema(rfs.schema), rfs.type); + + // check if we have a need to set twoLevelAcccessRequired flag + if(rfs.type == DataType.BAG) { + if(fs.schema.size() == 1) { + FieldSchema innerFs = fs.schema.getField(0); + if(innerFs.type == DataType.TUPLE && innerFs.schema != null) { + fs.schema.setTwoLevelAccessRequired(true); + } + } + } + fsList.add(fs); + } + return new Schema(fsList); + } + + private static Schema getPigSchema(ResourceFieldSchema rfSchema) + throws FrontendException { + return new Schema(new FieldSchema(rfSchema.name, + rfSchema.schema == null ? null : getPigSchema(rfSchema.schema), + rfSchema.type)); + } + } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java Tue Oct 27 01:13:19 2009 @@ -31,6 +31,7 @@ * manner via its <code>stdin</code> or in an {...@link InputType#ASYNCHRONOUS} * manner via an external file which is subsequently read by the executable. */ +//XXX: FIXME make this work with new load store redesign public abstract class InputHandler { /** * @@ -77,7 +78,7 @@ */ public synchronized void close(Process process) throws IOException { if(!alreadyClosed) { - serializer.finish(); +// serializer.finish(); alreadyClosed = true; } } @@ -91,6 +92,6 @@ * @throws IOException */ public void bindTo(OutputStream os) throws IOException { - serializer.bindTo(os); +// serializer.bindTo(os); } } Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Oct 27 01:13:19 2009 @@ -62,8 +62,9 @@ */ public void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException { - deserializer.bindTo(fileName, new BufferedPositionedInputStream(is), - offset, end); + // XXX: FIXME - make this work with new load-store redesign +// deserializer.bindTo(fileName, new BufferedPositionedInputStream(is), +// offset, end); } /** Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=830041&r1=830040&r2=830041&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java Tue Oct 27 01:13:19 2009 @@ -32,6 +32,7 @@ import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -62,7 +63,7 @@ lookupTable = new HashMap<String, Boolean>(); - Properties props = ConfigurationUtil.toProperties(PigInputFormat.sJob); + Properties props = ConfigurationUtil.toProperties(PigMapReduce.sJobConf); InputStream is = FileLocalizer.openDFSFile(FilterFileName, props); BufferedReader reader = new BufferedReader(new InputStreamReader(is));