Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/SampleLoader.java
 Tue Oct 27 01:13:19 2009
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Map;
 
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.SamplableLoader;
@@ -39,16 +40,18 @@
  * Abstract class that specifies the interface for sample loaders
  *
  */
+//XXX : FIXME - make this work with new load-store redesign
 public abstract class SampleLoader implements LoadFunc {
 
        protected int numSamples;
        protected long skipInterval;
-    protected SamplableLoader loader;
+    protected LoadFunc loader;
        private TupleFactory factory;
+       private boolean initialized = false;
 
     
     public SampleLoader(String funcSpec) {
-       loader = (SamplableLoader)PigContext.instantiateFuncFromSpec(funcSpec);
+       loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
     }
     
     public void setNumSamples(int n) {
@@ -59,133 +62,21 @@
        return numSamples;
     }
     
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bindTo(java.lang.String, 
org.apache.pig.impl.io.BufferedPositionedInputStream, long, long)
-        */
-       public void bindTo(String fileName, BufferedPositionedInputStream is,
-                       long offset, long end) throws IOException {
-        skipInterval = (end - offset)/numSamples;
-        loader.bindTo(fileName, is, offset, end);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToBag(byte[])
-        */
-       public DataBag bytesToBag(byte[] b) throws IOException {
-        return loader.bytesToBag(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToCharArray(byte[])
-        */
-       public String bytesToCharArray(byte[] b) throws IOException {
-        return loader.bytesToCharArray(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToDouble(byte[])
-        */
-       public Double bytesToDouble(byte[] b) throws IOException {
-        return loader.bytesToDouble(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToFloat(byte[])
-        */
-       public Float bytesToFloat(byte[] b) throws IOException {
-        return loader.bytesToFloat(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToInteger(byte[])
-        */
-       public Integer bytesToInteger(byte[] b) throws IOException {
-               return loader.bytesToInteger(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToLong(byte[])
-        */
-       public Long bytesToLong(byte[] b) throws IOException {
-        return loader.bytesToLong(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToMap(byte[])
-        */
-       public Map<String, Object> bytesToMap(byte[] b) throws IOException {
-        return loader.bytesToMap(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#bytesToTuple(byte[])
-        */
-       public Tuple bytesToTuple(byte[] b) throws IOException {
-        return loader.bytesToTuple(b);
-       }
-
-       /* (non-Javadoc)
-        * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, 
org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
-        */
-       public Schema determineSchema(String fileName, ExecType execType,
-                       DataStorage storage) throws IOException {
-        return loader.determineSchema(fileName, execType, storage);
-       }
-
-       /* (non-Javadoc)
-        * @see 
org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
-        */
-       public void fieldsToRead(Schema schema) {
-        loader.fieldsToRead(schema);
-       }
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#getInputFormat()
+     */
+    @Override
+    public InputFormat getInputFormat() {
+        return loader.getInputFormat();
+    }
 
-       /* (non-Javadoc)
+    /* (non-Javadoc)
         * @see org.apache.pig.LoadFunc#getNext()
         */
        public Tuple getNext() throws IOException {
-       long initialPos = loader.getPosition();
-        
-        // we move to next boundry
-        Tuple t = loader.getSampledTuple();        
-        long finalPos = loader.getPosition();
-
-        long toSkip = skipInterval - (finalPos - initialPos);
-        if (toSkip > 0) {
-            long rc = loader.skip(toSkip);
-            
-            // if we did not skip enough
-            // in the first attempt, call
-            // in.skip() repeatedly till we
-            // skip enough
-            long remainingSkip = toSkip - rc;
-            while(remainingSkip > 0) {
-                rc = loader.skip(remainingSkip);
-                if(rc == 0) {
-                    // underlying stream saw EOF
-                    break;
-                }
-                remainingSkip -= rc;
-            }
-        }       
-        
-        if (t == null) {
-               return null;
-        }
-        
-               if (factory == null) {
-               factory = TupleFactory.getInstance();
-        }
-
-        // copy existing field 
-        Tuple m = factory.newTuple(t.size()+1);
-        for(int i=0; i<t.size(); i++) {
-               m.set(i, t.get(i));
-        }
-        
-        // add size of the tuple at the end
-        m.set(t.size(), (finalPos-initialPos) + 1); // offset 1 for null
-        
-        return m;              
+          // estimate how many tuples there are in the map
+          // based on the 
+           return null;   
        }
 
        public void computeSamples(ArrayList<Pair<FileSpec, Boolean>> inputs, 
PigContext pc) throws ExecException {

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java?rev=830041&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java
 (added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageInputFormat.java
 Tue Oct 27 01:13:19 2009
@@ -0,0 +1,30 @@
+/**
+ * 
+ */
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.data.Tuple;
+
+/**
+ *
+ */
+public class BinStorageInputFormat extends FileInputFormat<Text, Tuple> {
+
+    /* (non-Javadoc)
+     * @see 
org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit,
 org.apache.hadoop.mapreduce.TaskAttemptContext)
+     */
+    @Override
+    public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+            TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        return new BinStorageRecordReader();
+    }
+
+}

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java?rev=830041&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java
 (added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/BinStorageRecordReader.java
 Tue Oct 27 01:13:19 2009
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataReaderWriter;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Treats keys as offset in file and value as line. 
+ */
+public class BinStorageRecordReader extends RecordReader<Text, Tuple> {
+
+  private long start;
+  private long pos;
+  private long end;
+  private BufferedPositionedInputStream in;
+  private Tuple value = null;
+  public static final int RECORD_1 = 0x01;
+  public static final int RECORD_2 = 0x02;
+  public static final int RECORD_3 = 0x03;
+  private DataInputStream inData = null;
+
+  public void initialize(InputSplit genericSplit,
+                         TaskAttemptContext context) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration job = context.getConfiguration();
+    start = split.getStart();
+    end = start + split.getLength();
+    final Path file = split.getPath();
+
+    // open the file and seek to the start of the split
+    FileSystem fs = file.getFileSystem(job);
+    FSDataInputStream fileIn = fs.open(split.getPath());
+    if (start != 0) {
+        fileIn.seek(start);
+    }
+    in = new BufferedPositionedInputStream(fileIn, start);
+    inData = new DataInputStream(in);
+  }
+  
+  public boolean nextKeyValue() throws IOException {
+      int b = 0;
+      //    skip to next record
+      while (true) {
+          if (in == null || in.getPosition() >=end) {
+              return false;
+          }
+          // check if we saw RECORD_1 in our last attempt
+          // this can happen if we have the following 
+          // sequence RECORD_1-RECORD_1-RECORD_2-RECORD_3
+          // After reading the second RECORD_1 in the above
+          // sequence, we should not look for RECORD_1 again
+          if(b != RECORD_1) {
+              b = in.read();
+              if(b != RECORD_1 && b != -1) {
+                  continue;
+              }
+              if(b == -1) return false;
+          }
+          b = in.read();
+          if(b != RECORD_2 && b != -1) {
+              continue;
+          }
+          if(b == -1) return false;
+          b = in.read();
+          if(b != RECORD_3 && b != -1) {
+              continue;
+          }
+          if(b == -1) return false;
+          b = in.read();
+          if(b != DataType.TUPLE && b != -1) {
+              continue;
+          }
+          if(b == -1) return false;
+          break;
+      }
+      try {
+          // if we got here, we have seen 
RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
+          // sequence - lets now read the contents of the tuple 
+          value = (Tuple)DataReaderWriter.readDatum(inData, DataType.TUPLE);
+          return true;
+      } catch (ExecException ee) {
+          throw ee;
+      }
+
+  }
+
+  @Override
+  public Text getCurrentKey() {
+      // the key is always null since we don't really have a key for each
+      // input record
+      return null;
+  }
+
+  @Override
+  public Tuple getCurrentValue() {
+    return value;
+  }
+
+  /**
+   * Get the progress within the split
+   */
+  public float getProgress() {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (pos - start) / (float)(end - start));
+    }
+  }
+  
+  public synchronized void close() throws IOException {
+    if (in != null) {
+      in.close(); 
+    }
+  }
+}

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
 Tue Oct 27 01:13:19 2009
@@ -152,15 +152,7 @@
      * @throws IOException
      */
     public static InputStream openDFSFile(String fileName) throws IOException {
-        SliceWrapper wrapper = PigInputFormat.getActiveSplit();
-
-        Configuration conf = null;
-        if (wrapper == null) {
-               conf = PigMapReduce.sJobConf;
-        }else{
-               conf = wrapper.getJobConf();
-        }
-        
+        Configuration conf = PigMapReduce.sJobConf;
         if (conf == null) {
             throw new RuntimeException(
                     "can't open DFS file while executing locally");
@@ -177,14 +169,7 @@
     }
     
     public static long getSize(String fileName) throws IOException {
-       SliceWrapper wrapper = PigInputFormat.getActiveSplit();
-       
-       Configuration conf = null;
-       if (wrapper == null) {
-               conf = PigMapReduce.sJobConf;
-       }else{
-               conf = wrapper.getJobConf();
-       }
+       Configuration conf = PigMapReduce.sJobConf;
 
        if (conf == null) {
                throw new RuntimeException(

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java 
Tue Oct 27 01:13:19 2009
@@ -31,6 +31,8 @@
 import org.apache.pig.impl.io.FileLocalizer;
 
 
+// XXX: FIXME: make this work with load store redesign
+
 public class PigFile {
     private String file = null;
     boolean append = false;
@@ -47,7 +49,8 @@
     public DataBag load(LoadFunc lfunc, PigContext pigContext) throws 
IOException {
         DataBag content = BagFactory.getInstance().newDefaultBag();
         InputStream is = FileLocalizer.open(file, pigContext);
-        lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, 
Long.MAX_VALUE);
+        //XXX FIXME: make this work with new load-store redesign
+//        lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, 
Long.MAX_VALUE);
         Tuple f = null;
         while ((f = lfunc.getNext()) != null) {
             content.add(f);
@@ -58,12 +61,12 @@
     
     public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) 
throws IOException {
         BufferedOutputStream bos = new 
BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
-        sfunc.bindTo(bos);
+//        sfunc.bindTo(bos);
         for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
             Tuple row = it.next();
             sfunc.putNext(row);
         }
-        sfunc.finish();
+//        sfunc.finish();
         bos.close();
     }
 

Added: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=830041&view=auto
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
 (added)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/ReadToEndLoader.java
 Tue Oct 27 01:13:19 2009
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * This is wrapper Loader which wraps a real LoadFunc underneath and allows
+ * to read a file completely starting a given split (indicated by a split 
index 
+ * which is used to look in the List<InputSplit> returned by the underlying
+ * InputFormat's getSplits() method). So if the supplied split index is 0, this
+ * loader will read the entire file. If it is non zero it will read the partial
+ * file beginning from that split to the last split.
+ * 
+ * The call sequence to use this is:
+ * 1) construct an object using the constructor
+ * 2) Call getNext() in a loop till it returns null
+ */
+public class ReadToEndLoader implements LoadFunc {
+
+    /**
+     * the wrapped LoadFunc which will do the actual reading
+     */
+    private LoadFunc wrappedLoadFunc;
+    
+    /**
+     * the Configuration object used to locate the input location - this will
+     * be used to call {...@link LoadFunc#setLocation(String, Configuration)} 
on 
+     * the wrappedLoadFunc
+     */
+    private Configuration conf;
+    
+    /**
+     * the input location string (typically input file/dir name )
+     */
+    private String inputLocation;
+    
+    /**
+     * the index of the split (in {...@link 
InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)})
+     * to start reading from
+     */
+    private int startSplitIndex;
+
+    /**
+     * the index of the split the loader is currently reading from
+     */
+    private int curSplitIndex;
+    
+    /**
+     * the input splits returned by underlying {...@link 
InputFormat#getSplits(JobContext)}
+     */
+    private List<InputSplit> splits;
+    
+    /**
+     * underlying RecordReader
+     */
+    private RecordReader reader;
+    
+    /**
+     * underlying InputFormat
+     */
+    private InputFormat inputFormat;
+    
+    /**
+     * @param wrappedLoadFunc
+     * @param conf
+     * @param inputLocation
+     * @param splitIndex
+     * @throws IOException 
+     * @throws InterruptedException 
+     */
+    public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
+            String inputLocation, int splitIndex) throws IOException {
+        this.wrappedLoadFunc = wrappedLoadFunc;
+        // make a copy so that if the underlying InputFormat writes to the
+        // conf, we don't affect the caller's copy
+        this.conf = new Configuration(conf);
+        this.inputLocation = inputLocation;
+        this.startSplitIndex = splitIndex;
+        this.curSplitIndex = startSplitIndex;
+        
+        // let's initialize the wrappedLoadFunc 
+        Job job = new Job(this.conf);
+        wrappedLoadFunc.setLocation(this.inputLocation, 
+                job);
+        // The above setLocation call could write to the conf within
+        // the job - get a hold of the modified conf
+        this.conf = job.getConfiguration();
+        inputFormat = wrappedLoadFunc.getInputFormat();
+        try {
+            splits = inputFormat.getSplits(new JobContext(this.conf,
+                    new JobID()));
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    private boolean initializeReader() throws IOException, 
+    InterruptedException {
+        if(curSplitIndex > splits.size() - 1) {
+            // past the last split, we are done
+            return false;
+        }
+        
+        InputSplit curSplit = splits.get(curSplitIndex);
+        TaskAttemptContext tAContext = new TaskAttemptContext(conf, 
+                new TaskAttemptID());
+        reader = inputFormat.createRecordReader(curSplit, tAContext);
+        reader.initialize(curSplit, tAContext);
+        // create a dummy pigsplit - other than the actual split, the other
+        // params are really not needed here where we are just reading the
+        // input completely
+        PigSplit pigSplit = new PigSplit(curSplit, -1, 
+                new ArrayList<OperatorKey>(), -1);
+        wrappedLoadFunc.prepareToRead(reader, pigSplit);
+        return true;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#getNext()
+     */
+    public Tuple getNext() throws IOException {
+        try {
+            Tuple t = null;
+            if(reader == null) {
+                // first call
+                return getNextHelper();
+            } else {
+                // we already have a reader initialized
+                t = wrappedLoadFunc.getNext();
+                if(t != null) {
+                    return t;
+                }
+                // if loadfunc returned null, we need to read next split
+                // if there is one
+                curSplitIndex++;
+                return getNextHelper();
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    private Tuple getNextHelper() throws IOException, InterruptedException {
+        Tuple t = null;
+        while(initializeReader()) {
+            t = wrappedLoadFunc.getNext();
+            if(t == null) {
+                // try next split
+                curSplitIndex++;
+            } else {
+                return t;
+            }
+        }
+        // we processed all splits - we are done
+        wrappedLoadFunc.doneReading();
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#doneReading()
+     */
+    @Override
+    public void doneReading() {
+        throw new RuntimeException("Internal Error: Unimplemented method 
called!");        
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#getInputFormat()
+     */
+    @Override
+    public InputFormat getInputFormat() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#getLoadCaster()
+     */
+    @Override
+    public LoadCaster getLoadCaster() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader, 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit)
+     */
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) {
+        throw new RuntimeException("Internal Error: Unimplemented method 
called!");        
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#setLocation(java.lang.String, 
org.apache.hadoop.mapreduce.Job)
+     */
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        throw new RuntimeException("Internal Error: Unimplemented method 
called!");        
+    }
+   
+}

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOLoad.java
 Tue Oct 27 01:13:19 2009
@@ -27,8 +27,11 @@
 import java.util.Set;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -38,12 +41,14 @@
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
 
 public class LOLoad extends RelationalOperator {
     private static final long serialVersionUID = 2L;
@@ -145,8 +150,8 @@
                 }
 
                 if(null == mDeterminedSchema) {
-                    mSchema = mLoadFunc.determineSchema(mSchemaFile, 
mExecType, mStorage);
-                    mDeterminedSchema  = mSchema;
+                    mSchema = determineSchema();
+                    mDeterminedSchema  = mSchema;    
                 }
                 mIsSchemaComputed = true;
             } catch (IOException ioe) {
@@ -161,6 +166,22 @@
         return mSchema;
     }
     
+    private Schema determineSchema() throws IOException {
+        if(LoadMetadata.class.isAssignableFrom(mLoadFunc.getClass())) {
+            // XXX: FIXME - mStorage should no longer be needed, we
+            // should use Configuration directly by passing a 
+            // Configuration object while creating LOLoad rather than
+            // a DataStorage object
+            mLoadFunc.setLocation(mInputFileSpec.getFileName(), 
+                    new Job(ConfigurationUtil.toConfiguration(
+                            mStorage.getConfiguration())));
+            LoadMetadata loadMetadata = (LoadMetadata)mLoadFunc;
+            ResourceSchema rSchema = loadMetadata.getSchema();
+            return Schema.getPigSchema(rSchema);
+        } else {
+            return null;
+        }
+    }
     /* (non-Javadoc)
      * @see 
org.apache.pig.impl.logicalLayer.LogicalOperator#setSchema(org.apache.pig.impl.logicalLayer.schema.Schema)
      */
@@ -253,7 +274,7 @@
             }
         } else {
             try {
-                inputSchema = mLoadFunc.determineSchema(mSchemaFile, 
mExecType, mStorage);
+                inputSchema = determineSchema();
             } catch (IOException ioe) {
                 mProjectionMap = null;
                 return mProjectionMap;

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
 Tue Oct 27 01:13:19 2009
@@ -28,6 +28,8 @@
 import java.util.Collection;
 
 import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
 //import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.commons.logging.Log;
@@ -672,7 +674,7 @@
     public FieldSchema getField(String alias) throws FrontendException {
         FieldSchema fs = mAliases.get(alias);
         if(null == fs) {
-            String cocoPrefix = "::" + alias;
+            String cocoPrefix = new String("::" + alias);
             Map<String, Integer> aliasMatches = new HashMap<String, Integer>();
             //build the map of aliases that have cocoPrefix as the suffix
             for(String key: mAliases.keySet()) {
@@ -798,7 +800,7 @@
                         if (aliases != null) {
                             List<String> listAliases = new ArrayList<String>();
                             for(String alias: aliases) {
-                                listAliases.add(alias);
+                                listAliases.add(new String(alias));
                             }
                             for(String alias: listAliases) {
                                 log.debug("Removing alias " + alias + " from 
multimap");
@@ -1597,6 +1599,34 @@
         this.twoLevelAccessRequired = twoLevelAccess;
     }
     
+    public static Schema getPigSchema(ResourceSchema rSchema) 
+    throws FrontendException {
+        List<FieldSchema> fsList = new ArrayList<FieldSchema>();
+        for(ResourceFieldSchema rfs : rSchema.fields) {
+            FieldSchema fs = new FieldSchema(rfs.name, rfs.schema == null ? 
null:
+                getPigSchema(rfs.schema), rfs.type);
+            
+            // check if we have a need to set twoLevelAcccessRequired flag
+            if(rfs.type == DataType.BAG) {
+                if(fs.schema.size() == 1) {
+                    FieldSchema innerFs = fs.schema.getField(0);
+                    if(innerFs.type == DataType.TUPLE && innerFs.schema != 
null) {
+                        fs.schema.setTwoLevelAccessRequired(true);
+                    }
+                }
+            }
+            fsList.add(fs);
+        }
+        return new Schema(fsList);
+    }
+    
+    private static Schema getPigSchema(ResourceFieldSchema rfSchema) 
+    throws FrontendException {
+        return new Schema(new FieldSchema(rfSchema.name, 
+                rfSchema.schema == null ? null : getPigSchema(rfSchema.schema),
+                        rfSchema.type));
+    }
+    
 }
 
 

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/InputHandler.java
 Tue Oct 27 01:13:19 2009
@@ -31,6 +31,7 @@
  * manner via its <code>stdin</code> or in an {...@link 
InputType#ASYNCHRONOUS} 
  * manner via an external file which is subsequently read by the executable.
  */
+//XXX: FIXME make this work with new load store redesign
 public abstract class InputHandler {
     /**
      * 
@@ -77,7 +78,7 @@
      */
     public synchronized void close(Process process) throws IOException {
         if(!alreadyClosed) {
-            serializer.finish();
+//            serializer.finish();
             alreadyClosed = true;
         }
     }
@@ -91,6 +92,6 @@
      * @throws IOException
      */
     public void bindTo(OutputStream os) throws IOException {
-        serializer.bindTo(os);
+//        serializer.bindTo(os);
     }
 }

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/OutputHandler.java
 Tue Oct 27 01:13:19 2009
@@ -62,8 +62,9 @@
      */
     public void bindTo(String fileName, BufferedPositionedInputStream is,
                        long offset, long end) throws IOException {
-        deserializer.bindTo(fileName, new BufferedPositionedInputStream(is), 
-                            offset, end);
+        // XXX: FIXME - make this work with new load-store redesign
+//        deserializer.bindTo(fileName, new BufferedPositionedInputStream(is), 
+//                            offset, end);
     }
     
     /**

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java?rev=830041&r1=830040&r2=830041&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/FILTERFROMFILE.java
 Tue Oct 27 01:13:19 2009
@@ -32,6 +32,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -62,7 +63,7 @@
            
                lookupTable = new HashMap<String, Boolean>();
                
-               Properties props = 
ConfigurationUtil.toProperties(PigInputFormat.sJob);
+               Properties props = 
ConfigurationUtil.toProperties(PigMapReduce.sJobConf);
                InputStream is = FileLocalizer.openDFSFile(FilterFileName, 
props);
 
                BufferedReader reader = new BufferedReader(new 
InputStreamReader(is));


Reply via email to