Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "LoadStoreMigrationGuide" page has been changed by PradeepKamath. http://wiki.apache.org/pig/LoadStoreMigrationGuide?action=diff&rev1=12&rev2=13 -------------------------------------------------- An example of how a simple !LoadFunc implementation based on old interface can be converted to the new interfaces is shown in the Examples section below. + = StoreFunc Migration = + The main change is that the new !StoreFunc API is based on a !OutputFormat to read the data. Implementations can choose to use existing !OutputFormat like !TextOutputFormat or implement a new one. + + An example of how a simple !StoreFunc implementation based on old interface can be converted to the new interfaces is shown in the Examples section below. = Examples = @@ -159, +163 @@ private TupleFactory mTupleFactory = TupleFactory.getInstance(); private static final int BUFFER_SIZE = 1024; - public PigStorage() { + public SimpleTextLoader() { } /** @@ -169, +173 @@ * the single byte character that is used to separate fields. * ("\t" is the default.) */ - public PigStorage(String delimiter) { + public SimpleTextLoader(String delimiter) { this(); if (delimiter.length() == 1) { this.fieldDel = (byte)delimiter.charAt(0); @@ -263, +267 @@ The storer implementation in the example is a storer for text data with line delimiter as '\n' and '\t' as default field delimiter (which can be overridden by passing a different field delimiter in the constructor) - this is similar to current !PigStorage storer in Pig. The new implementation uses an existing Hadoop supported !OutputFormat - !TextOutputFormat as the underlying !OutputFormat. === Old Implementation === + {{{ + public class SimpleTextStorer implements StoreFunc { + + protected byte recordDel = '\n'; + protected byte fieldDel = '\t'; + + protected static final String UTF8 = "UTF-8"; + + public SimpleTextStorer() {} + /** + * Constructs a Pig storer that uses specified character as a field delimiter. + * + * @param delimiter + * the single byte character that is used to separate fields. + * ("\t" is the default.) + */ + public SimpleTextStorer(String delimiter) { + this(); + if (delimiter.length() == 1) { + this.fieldDel = (byte)delimiter.charAt(0); + } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') { + switch (delimiter.charAt(1)) { + case 't': + this.fieldDel = (byte)'\t'; + break; + + case 'x': + case 'u': + this.fieldDel = + Integer.valueOf(delimiter.substring(2)).byteValue(); + break; + + default: + throw new RuntimeException("Unknown delimiter " + delimiter); + } + } else { + throw new RuntimeException("PigStorage delimeter must be a single character"); + } + } + + OutputStream mOut; + public void bindTo(OutputStream os) throws IOException { + mOut = os; + } + + @SuppressWarnings("unchecked") + private void putField(Object field) throws IOException { + //string constants for each delimiter + String tupleBeginDelim = "("; + String tupleEndDelim = ")"; + String bagBeginDelim = "{"; + String bagEndDelim = "}"; + String mapBeginDelim = "["; + String mapEndDelim = "]"; + String fieldDelim = ","; + String mapKeyValueDelim = "#"; + + switch (DataType.findType(field)) { + case DataType.NULL: + break; // just leave it empty + + case DataType.BOOLEAN: + mOut.write(((Boolean)field).toString().getBytes()); + break; + + case DataType.INTEGER: + mOut.write(((Integer)field).toString().getBytes()); + break; + + case DataType.LONG: + mOut.write(((Long)field).toString().getBytes()); + break; + + case DataType.FLOAT: + mOut.write(((Float)field).toString().getBytes()); + break; + + case DataType.DOUBLE: + mOut.write(((Double)field).toString().getBytes()); + break; + + case DataType.BYTEARRAY: { + byte[] b = ((DataByteArray)field).get(); + mOut.write(b, 0, b.length); + break; + } + + case DataType.CHARARRAY: + // oddly enough, writeBytes writes a string + mOut.write(((String)field).getBytes(UTF8)); + break; + + case DataType.MAP: + boolean mapHasNext = false; + Map<String, Object> m = (Map<String, Object>)field; + mOut.write(mapBeginDelim.getBytes(UTF8)); + for(Map.Entry<String, Object> e: m.entrySet()) { + if(mapHasNext) { + mOut.write(fieldDelim.getBytes(UTF8)); + } else { + mapHasNext = true; + } + putField(e.getKey()); + mOut.write(mapKeyValueDelim.getBytes(UTF8)); + putField(e.getValue()); + } + mOut.write(mapEndDelim.getBytes(UTF8)); + break; + + case DataType.TUPLE: + boolean tupleHasNext = false; + Tuple t = (Tuple)field; + mOut.write(tupleBeginDelim.getBytes(UTF8)); + for(int i = 0; i < t.size(); ++i) { + if(tupleHasNext) { + mOut.write(fieldDelim.getBytes(UTF8)); + } else { + tupleHasNext = true; + } + try { + putField(t.get(i)); + } catch (ExecException ee) { + throw ee; + } + } + mOut.write(tupleEndDelim.getBytes(UTF8)); + break; + + case DataType.BAG: + boolean bagHasNext = false; + mOut.write(bagBeginDelim.getBytes(UTF8)); + Iterator<Tuple> tupleIter = ((DataBag)field).iterator(); + while(tupleIter.hasNext()) { + if(bagHasNext) { + mOut.write(fieldDelim.getBytes(UTF8)); + } else { + bagHasNext = true; + } + putField((Object)tupleIter.next()); + } + mOut.write(bagEndDelim.getBytes(UTF8)); + break; + + default: { + int errCode = 2108; + String msg = "Could not determine data type of field: " + field; + throw new ExecException(msg, errCode, PigException.BUG); + } + + } + } + + public void putNext(Tuple f) throws IOException { + // I have to convert integer fields to string, and then to bytes. + // If I use a DataOutputStream to convert directly from integer to + // bytes, I don't get a string representation. + int sz = f.size(); + for (int i = 0; i < sz; i++) { + Object field; + try { + field = f.get(i); + } catch (ExecException ee) { + throw ee; + } + + putField(field); + + if (i == sz - 1) { + // last field in tuple. + mOut.write(recordDel); + } else { + mOut.write(fieldDel); + } + } + } + + public void finish() throws IOException { + } + + /* (non-Javadoc) + * @see org.apache.pig.StoreFunc#getStorePreparationClass() + */ + + public Class getStorePreparationClass() throws IOException { + return null; + } + + } + }}} === New Implementation === + {{{ + + }}} +
