Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "LoadStoreMigrationGuide" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreMigrationGuide?action=diff&rev1=13&rev2=14

--------------------------------------------------

      
      public SimpleTextStorer() {}
  
-     /**
-      * Constructs a Pig storer that uses specified character as a field 
delimiter.
-      * 
-      * @param delimiter
-      *            the single byte character that is used to separate fields.
-      *            ("\t" is the default.)
-      */
      public SimpleTextStorer(String delimiter) {
          this();
          if (delimiter.length() == 1) {
@@ -460, +453 @@

  === New Implementation ===
  
  {{{
+ public class SimpleTextStorer implements StoreFunc {
+     protected RecordWriter writer = null;
+         
+     private byte fieldDel = '\t';
+     private static final int BUFFER_SIZE = 1024;
+     
+     public PigStorage() {
+     }
+     
+     public PigStorage(String delimiter) {
+         this();
+         if (delimiter.length() == 1) {
+             this.fieldDel = (byte)delimiter.charAt(0);
+         } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') {
+             switch (delimiter.charAt(1)) {
+             case 't':
+                 this.fieldDel = (byte)'\t';
+                 break;
+ 
+             case 'x':
+             case 'u':
+                 this.fieldDel =
+                     Integer.valueOf(delimiter.substring(2)).byteValue();
+                 break;
+ 
+             default:                
+                 throw new RuntimeException("Unknown delimiter " + delimiter);
+             }
+         } else {            
+             throw new RuntimeException("PigStorage delimeter must be a single 
character");
+         }
+     }
+ 
+     ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
+ 
+     @Override
+     public void putNext(Tuple f) throws IOException {
+         int sz = f.size();
+         for (int i = 0; i < sz; i++) {
+             Object field;
+             try {
+                 field = f.get(i);
+             } catch (ExecException ee) {
+                 throw ee;
+             }
+ 
+             putField(mOut, field);
+ 
+             if (i != sz - 1) {
+                 mOut.write(fieldDel);
+             }
+         }
+         Text text = new Text(mOut.toByteArray());
+         try {
+             writer.write(null, text);
+             mOut.reset();
+         } catch (InterruptedException e) {
+             throw new IOException(e);
+         }
+     }
+ 
+     @SuppressWarnings("unchecked")
+     private void putField(Object field) throws IOException {
+         //string constants for each delimiter
+         String tupleBeginDelim = "(";
+         String tupleEndDelim = ")";
+         String bagBeginDelim = "{";
+         String bagEndDelim = "}";
+         String mapBeginDelim = "[";
+         String mapEndDelim = "]";
+         String fieldDelim = ",";
+         String mapKeyValueDelim = "#";
+ 
+         switch (DataType.findType(field)) {
+         case DataType.NULL:
+             break; // just leave it empty
+ 
+         case DataType.BOOLEAN:
+             mOut.write(((Boolean)field).toString().getBytes());
+             break;
+ 
+         case DataType.INTEGER:
+             mOut.write(((Integer)field).toString().getBytes());
+             break;
+ 
+         case DataType.LONG:
+             mOut.write(((Long)field).toString().getBytes());
+             break;
+ 
+         case DataType.FLOAT:
+             mOut.write(((Float)field).toString().getBytes());
+             break;
+ 
+         case DataType.DOUBLE:
+             mOut.write(((Double)field).toString().getBytes());
+             break;
+ 
+         case DataType.BYTEARRAY: {
+             byte[] b = ((DataByteArray)field).get();
+             mOut.write(b, 0, b.length);
+             break;
+                                  }
+ 
+         case DataType.CHARARRAY:
+             // oddly enough, writeBytes writes a string
+             mOut.write(((String)field).getBytes(UTF8));
+             break;
+ 
+         case DataType.MAP:
+             boolean mapHasNext = false;
+             Map<String, Object> m = (Map<String, Object>)field;
+             mOut.write(mapBeginDelim.getBytes(UTF8));
+             for(Map.Entry<String, Object> e: m.entrySet()) {
+                 if(mapHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     mapHasNext = true;
+                 }
+                 putField(e.getKey());
+                 mOut.write(mapKeyValueDelim.getBytes(UTF8));
+                 putField(e.getValue());
+             }
+             mOut.write(mapEndDelim.getBytes(UTF8));
+             break;
+ 
+         case DataType.TUPLE:
+             boolean tupleHasNext = false;
+             Tuple t = (Tuple)field;
+             mOut.write(tupleBeginDelim.getBytes(UTF8));
+             for(int i = 0; i < t.size(); ++i) {
+                 if(tupleHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     tupleHasNext = true;
+                 }
+                 try {
+                     putField(t.get(i));
+                 } catch (ExecException ee) {
+                     throw ee;
+                 }
+             }
+             mOut.write(tupleEndDelim.getBytes(UTF8));
+             break;
+ 
+         case DataType.BAG:
+             boolean bagHasNext = false;
+             mOut.write(bagBeginDelim.getBytes(UTF8));
+             Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
+             while(tupleIter.hasNext()) {
+                 if(bagHasNext) {
+                     mOut.write(fieldDelim.getBytes(UTF8));
+                 } else {
+                     bagHasNext = true;
+                 }
+                 putField((Object)tupleIter.next());
+             }
+             mOut.write(bagEndDelim.getBytes(UTF8));
+             break;
+             
+         default: {
+             int errCode = 2108;
+             String msg = "Could not determine data type of field: " + field;
+             throw new ExecException(msg, errCode, PigException.BUG);
+         }
+         
+         }
+     }
+ 
+     @Override
+     public OutputFormat getOutputFormat() {
+         return new TextOutputFormat<WritableComparable, Text>();
+     }
+ 
+     @Override
+     public void prepareToWrite(RecordWriter writer) {
+         this.writer = writer;        
+     }
+ 
+     @Override
+     public void setStoreLocation(String location, Job job) throws IOException 
{
+         job.getConfiguration().set("mapred.textoutputformat.separator", "");
+         FileOutputFormat.setOutputPath(job, new Path(location));
+         if (location.endsWith(".bz2")) {
+             FileOutputFormat.setCompressOutput(job, true);
+             FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
+         }  else if (location.endsWith(".gz")) {
+             FileOutputFormat.setCompressOutput(job, true);
+             FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+         }
+     }
+ 
+     @Override
+     public void checkSchema(ResourceSchema s) throws IOException {
+         // nothing to do
+     }
+ 
+     @Override
+     public String relToAbsPathForStoreLocation(String location, Path curDir)
+             throws IOException {
+         return LoadFunc.getAbsolutePath(location, curDir);
+     }
+ 
+     @Override
+     public void setStoreFuncUDFContextSignature(String signature) {
+         // nothing to do
+     }
+ 
+ }
  
  }}}
  

Reply via email to