Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "LoadStoreMigrationGuide" page has been changed by PradeepKamath. http://wiki.apache.org/pig/LoadStoreMigrationGuide?action=diff&rev1=13&rev2=14 -------------------------------------------------- public SimpleTextStorer() {} - /** - * Constructs a Pig storer that uses specified character as a field delimiter. - * - * @param delimiter - * the single byte character that is used to separate fields. - * ("\t" is the default.) - */ public SimpleTextStorer(String delimiter) { this(); if (delimiter.length() == 1) { @@ -460, +453 @@ === New Implementation === {{{ + public class SimpleTextStorer implements StoreFunc { + protected RecordWriter writer = null; + + private byte fieldDel = '\t'; + private static final int BUFFER_SIZE = 1024; + + public PigStorage() { + } + + public PigStorage(String delimiter) { + this(); + if (delimiter.length() == 1) { + this.fieldDel = (byte)delimiter.charAt(0); + } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') { + switch (delimiter.charAt(1)) { + case 't': + this.fieldDel = (byte)'\t'; + break; + + case 'x': + case 'u': + this.fieldDel = + Integer.valueOf(delimiter.substring(2)).byteValue(); + break; + + default: + throw new RuntimeException("Unknown delimiter " + delimiter); + } + } else { + throw new RuntimeException("PigStorage delimeter must be a single character"); + } + } + + ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE); + + @Override + public void putNext(Tuple f) throws IOException { + int sz = f.size(); + for (int i = 0; i < sz; i++) { + Object field; + try { + field = f.get(i); + } catch (ExecException ee) { + throw ee; + } + + putField(mOut, field); + + if (i != sz - 1) { + mOut.write(fieldDel); + } + } + Text text = new Text(mOut.toByteArray()); + try { + writer.write(null, text); + mOut.reset(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + private void putField(Object field) throws IOException { + //string constants for each delimiter + String tupleBeginDelim = "("; + String tupleEndDelim = ")"; + String bagBeginDelim = "{"; + String bagEndDelim = "}"; + String mapBeginDelim = "["; + String mapEndDelim = "]"; + String fieldDelim = ","; + String mapKeyValueDelim = "#"; + + switch (DataType.findType(field)) { + case DataType.NULL: + break; // just leave it empty + + case DataType.BOOLEAN: + mOut.write(((Boolean)field).toString().getBytes()); + break; + + case DataType.INTEGER: + mOut.write(((Integer)field).toString().getBytes()); + break; + + case DataType.LONG: + mOut.write(((Long)field).toString().getBytes()); + break; + + case DataType.FLOAT: + mOut.write(((Float)field).toString().getBytes()); + break; + + case DataType.DOUBLE: + mOut.write(((Double)field).toString().getBytes()); + break; + + case DataType.BYTEARRAY: { + byte[] b = ((DataByteArray)field).get(); + mOut.write(b, 0, b.length); + break; + } + + case DataType.CHARARRAY: + // oddly enough, writeBytes writes a string + mOut.write(((String)field).getBytes(UTF8)); + break; + + case DataType.MAP: + boolean mapHasNext = false; + Map<String, Object> m = (Map<String, Object>)field; + mOut.write(mapBeginDelim.getBytes(UTF8)); + for(Map.Entry<String, Object> e: m.entrySet()) { + if(mapHasNext) { + mOut.write(fieldDelim.getBytes(UTF8)); + } else { + mapHasNext = true; + } + putField(e.getKey()); + mOut.write(mapKeyValueDelim.getBytes(UTF8)); + putField(e.getValue()); + } + mOut.write(mapEndDelim.getBytes(UTF8)); + break; + + case DataType.TUPLE: + boolean tupleHasNext = false; + Tuple t = (Tuple)field; + mOut.write(tupleBeginDelim.getBytes(UTF8)); + for(int i = 0; i < t.size(); ++i) { + if(tupleHasNext) { + mOut.write(fieldDelim.getBytes(UTF8)); + } else { + tupleHasNext = true; + } + try { + putField(t.get(i)); + } catch (ExecException ee) { + throw ee; + } + } + mOut.write(tupleEndDelim.getBytes(UTF8)); + break; + + case DataType.BAG: + boolean bagHasNext = false; + mOut.write(bagBeginDelim.getBytes(UTF8)); + Iterator<Tuple> tupleIter = ((DataBag)field).iterator(); + while(tupleIter.hasNext()) { + if(bagHasNext) { + mOut.write(fieldDelim.getBytes(UTF8)); + } else { + bagHasNext = true; + } + putField((Object)tupleIter.next()); + } + mOut.write(bagEndDelim.getBytes(UTF8)); + break; + + default: { + int errCode = 2108; + String msg = "Could not determine data type of field: " + field; + throw new ExecException(msg, errCode, PigException.BUG); + } + + } + } + + @Override + public OutputFormat getOutputFormat() { + return new TextOutputFormat<WritableComparable, Text>(); + } + + @Override + public void prepareToWrite(RecordWriter writer) { + this.writer = writer; + } + + @Override + public void setStoreLocation(String location, Job job) throws IOException { + job.getConfiguration().set("mapred.textoutputformat.separator", ""); + FileOutputFormat.setOutputPath(job, new Path(location)); + if (location.endsWith(".bz2")) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); + } else if (location.endsWith(".gz")) { + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); + } + } + + @Override + public void checkSchema(ResourceSchema s) throws IOException { + // nothing to do + } + + @Override + public String relToAbsPathForStoreLocation(String location, Path curDir) + throws IOException { + return LoadFunc.getAbsolutePath(location, curDir); + } + + @Override + public void setStoreFuncUDFContextSignature(String signature) { + // nothing to do + } + + } }}}
