Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "LoadStoreMigrationGuide" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreMigrationGuide?action=diff&rev1=2&rev2=3

--------------------------------------------------

          }
      }
  
-     public long getPosition() throws IOException {
-         return in.getPosition();
-     }
- 
-     public long skip(long n) throws IOException {
-         
-         long skipped = in.skip(n-1);
-         prevByte = (byte)in.read();
-         if(prevByte == -1) // End of stream.
-             return skipped;
-         else
-             return skipped+1;
-     }
- 
      public Tuple getNext() throws IOException {
          if (in == null || in.getPosition() > end) {
              return null;
@@ -86, +72 @@

                  readField();
              } else if (b == recordDel) {
                  readField();
-                 //Tuple t =  mTupleFactory.newTuple(mProtoTuple);
                  Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
                  mProtoTuple = null;
                  return t;
@@ -99, +84 @@

          }
      }
  
-     public Tuple getSampledTuple() throws IOException {
-        
-         if(prevByte == null || prevByte == recordDel) 
-             // prevByte = null when this is called for the first time, in 
that case bindTo would have already
-             // called getNext() if it was required.
-         return getNext();
-         
-         else{   // We are in middle of record. So, we skip this and return 
the next one.
-             getNext();
-             return getNext();            
-         }
-     }
- 
      public void bindTo(String fileName, BufferedPositionedInputStream in, 
long offset, long end) throws IOException {
          this.in = in;
          this.end = end;
@@ -123, +95 @@

          }
      }
      
-     OutputStream mOut;
-     public void bindTo(OutputStream os) throws IOException {
-         mOut = os;
-     }
- 
-     private void putField(Object field) throws IOException {
-         //string constants for each delimiter
-         String tupleBeginDelim = "(";
-         String tupleEndDelim = ")";
-         String bagBeginDelim = "{";
-         String bagEndDelim = "}";
-         String mapBeginDelim = "[";
-         String mapEndDelim = "]";
-         String fieldDelim = ",";
-         String mapKeyValueDelim = "#";
- 
-         switch (DataType.findType(field)) {
-         case DataType.NULL:
-             break; // just leave it empty
- 
-         case DataType.BOOLEAN:
-             mOut.write(((Boolean)field).toString().getBytes());
-             break;
- 
-         case DataType.INTEGER:
-             mOut.write(((Integer)field).toString().getBytes());
-             break;
- 
-         case DataType.LONG:
-             mOut.write(((Long)field).toString().getBytes());
-             break;
- 
-         case DataType.FLOAT:
-             mOut.write(((Float)field).toString().getBytes());
-             break;
- 
-         case DataType.DOUBLE:
-             mOut.write(((Double)field).toString().getBytes());
-             break;
- 
-         case DataType.BYTEARRAY: {
-             byte[] b = ((DataByteArray)field).get();
-             mOut.write(b, 0, b.length);
-             break;
-                                  }
- 
-         case DataType.CHARARRAY:
-             // oddly enough, writeBytes writes a string
-             mOut.write(((String)field).getBytes(UTF8));
-             break;
- 
-         case DataType.MAP:
-             boolean mapHasNext = false;
-             Map<String, Object> m = (Map<String, Object>)field;
-             mOut.write(mapBeginDelim.getBytes(UTF8));
-             for(String s: m.keySet()) {
-                 if(mapHasNext) {
-                     mOut.write(fieldDelim.getBytes(UTF8));
-                 } else {
-                     mapHasNext = true;
-                 }
-                 putField(s);
-                 mOut.write(mapKeyValueDelim.getBytes(UTF8));
-                 putField(m.get(s));
-             }
-             mOut.write(mapEndDelim.getBytes(UTF8));
-             break;
- 
-         case DataType.TUPLE:
-             boolean tupleHasNext = false;
-             Tuple t = (Tuple)field;
-             mOut.write(tupleBeginDelim.getBytes(UTF8));
-             for(int i = 0; i < t.size(); ++i) {
-                 if(tupleHasNext) {
-                     mOut.write(fieldDelim.getBytes(UTF8));
-                 } else {
-                     tupleHasNext = true;
-                 }
-                 try {
-                     putField(t.get(i));
-                 } catch (ExecException ee) {
-                     throw ee;
-                 }
-             }
-             mOut.write(tupleEndDelim.getBytes(UTF8));
-             break;
- 
-         case DataType.BAG:
-             boolean bagHasNext = false;
-             mOut.write(bagBeginDelim.getBytes(UTF8));
-             Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
-             while(tupleIter.hasNext()) {
-                 if(bagHasNext) {
-                     mOut.write(fieldDelim.getBytes(UTF8));
-                 } else {
-                     bagHasNext = true;
-                 }
-                 putField((Object)tupleIter.next());
-             }
-             mOut.write(bagEndDelim.getBytes(UTF8));
-             break;
-             
-         default: {
-             int errCode = 2108;
-             String msg = "Could not determine data type of field: " + field;
-             throw new ExecException(msg, errCode, PigException.BUG);
-         }
-         
-         }
-     }
- 
-     public void putNext(Tuple f) throws IOException {
-         // I have to convert integer fields to string, and then to bytes.
-         // If I use a DataOutputStream to convert directly from integer to
-         // bytes, I don't get a string representation.
-         int sz = f.size();
-         for (int i = 0; i < sz; i++) {
-             Object field;
-             try {
-                 field = f.get(i);
-             } catch (ExecException ee) {
-                 throw ee;
-             }
- 
-             putField(field);
- 
-             if (i == sz - 1) {
-                 // last field in tuple.
-                 mOut.write(recordDel);
-             } else {
-                 mOut.write(fieldDel);
-             }
-         }
-     }
- 
-     public void finish() throws IOException {
-     }
- 
      private void readField() {
          if (mProtoTuple == null) mProtoTuple = new ArrayList<Object>();
          if (mBuf.size() == 0) {
              // NULL value
              mProtoTuple.add(null);
          } else {
+             
-             // TODO, once this can take schemas, we need to figure out
-             // if the user requested this to be viewed as a certain
-             // type, and if so, then construct it appropriately.
              byte[] array = mBuf.toByteArray();
-             if (array[array.length-1]=='\r' && os==OS_WINDOWS) {
-                 // This is a java 1.6 function.  Until pig officially moves to
-                 // 1.6 we can't use this.
-                 // array = Arrays.copyOf(array, array.length-1);
-                 byte[] tmp = new byte[array.length - 1];
-                 for (int i = 0; i < array.length - 1; i++) tmp[i] = array[i];
-                 array = tmp;
-             }
-                 
              if (array.length==0)
                  mProtoTuple.add(null);
              else
@@ -288, +111 @@

          mBuf.reset();
      }
  
-     /* (non-Javadoc)
-      * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, 
org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
-      */
      public Schema determineSchema(String fileName, ExecType execType,
              DataStorage storage) throws IOException {
-         // TODO Auto-generated method stub
          return null;
      }
  
@@ -301, +120 @@

          // do nothing
      }
      
+     public RequiredFieldResponse fieldsToRead(RequiredFieldList 
requiredFieldList)
+     throws FrontendException {
+         // indicate to pig that this loader will return all fields and not 
just
+         // the required fields passed in the argument.
+         return new RequiredFieldResponse(false);
-     public boolean equals(Object obj) {
-         return equals((PigStorage)obj);
-     }
- 
-     public boolean equals(PigStorage other) {
-         return this.fieldDel == other.fieldDel;
-     }
- 
-     /* (non-Javadoc)
-      * @see org.apache.pig.StoreFunc#getStorePreparationClass()
-      */
-    
-     public Class getStorePreparationClass() throws IOException {
-         // TODO Auto-generated method stub
-         return null;
      }
   }
  
  }}}
  == New Implementation ==
+ {{{
+ public class PigStorage extends LoadFunc {
+     protected RecordReader in = null;               
+     private byte fieldDel = '\t';
+     private ArrayList<Object> mProtoTuple = null;
+     private TupleFactory mTupleFactory = TupleFactory.getInstance();
+     private static final int BUFFER_SIZE = 1024;
+     
+     public PigStorage() {
+     }
+     
+     /**
+      * Constructs a Pig loader that uses specified character as a field 
delimiter.
+      * 
+      * @param delimiter
+      *            the single byte character that is used to separate fields.
+      *            ("\t" is the default.)
+      */
+     public PigStorage(String delimiter) {
+         this();
+         if (delimiter.length() == 1) {
+             this.fieldDel = (byte)delimiter.charAt(0);
+         } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
+             switch (delimiter.charAt(1)) {
+             case 't':
+                 this.fieldDel = (byte)'\t';
+                 break;
  
+             case 'x':
+             case 'u':
+                 this.fieldDel =
+                     Integer.valueOf(delimiter.substring(2)).byteValue();
+                 break;
+ 
+             default:                
+                 throw new RuntimeException("Unknown delimiter " + delimiter);
+             }
+         } else {            
+             throw new RuntimeException("PigStorage delimeter must be a single 
character");
+         }
+     }
+ 
+     @Override
+     public Tuple getNext() throws IOException {
+         try {
+             boolean notDone = in.nextKeyValue();
+             if (!notDone) {
+                 return null;
+             }                                                                 
                          
+             Text value = (Text) in.getCurrentValue();
+             byte[] buf = value.getBytes();
+             int len = value.getLength();
+             int start = 0;
+ 
+             for (int i = 0; i < len; i++) {
+                 if (buf[i] == fieldDel) {
+                     readField(buf, start, i);
+                     start = i + 1;                   
+                 }
+             }
+             // pick up the last field
+             readField(buf, start, len);
+             
+             Tuple t =  mTupleFactory.newTupleNoCopy(mProtoTuple);
+             mProtoTuple = null;
+             return t;
+         } catch (InterruptedException e) {
+             int errCode = 6018;
+             String errMsg = "Error while reading input";
+             throw new ExecException(errMsg, errCode, 
+                     PigException.REMOTE_ENVIRONMENT, e);
+         }
+       
+     }
+ 
+     private void readField(byte[] buf, int start, int end) {
+         if (mProtoTuple == null) {
+             mProtoTuple = new ArrayList<Object>();
+         }
+ 
+         if (start == end) {
+             // NULL value
+             mProtoTuple.add(null);
+         } else {
+             mProtoTuple.add(new DataByteArray(buf, start, end));
+         }
+     }
+ 
+     @Override
+     public InputFormat getInputFormat() {
+         return new TextInputFormat();
+     }
+ 
+     @Override
+     public void prepareToRead(RecordReader reader, PigSplit split) {
+         in = reader;
+     }
+ 
+     @Override
+     public void setLocation(String location, Job job)
+             throws IOException {
+         FileInputFormat.setInputPaths(job, location);
+     }    
+ }
+ 
+ }}}
+ 

Reply via email to