Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "LoadStoreMigrationGuide" page has been changed by PradeepKamath. http://wiki.apache.org/pig/LoadStoreMigrationGuide?action=diff&rev1=2&rev2=3 -------------------------------------------------- } } - public long getPosition() throws IOException { - return in.getPosition(); - } - - public long skip(long n) throws IOException { - - long skipped = in.skip(n-1); - prevByte = (byte)in.read(); - if(prevByte == -1) // End of stream. - return skipped; - else - return skipped+1; - } - public Tuple getNext() throws IOException { if (in == null || in.getPosition() > end) { return null; @@ -86, +72 @@ readField(); } else if (b == recordDel) { readField(); - //Tuple t = mTupleFactory.newTuple(mProtoTuple); Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple); mProtoTuple = null; return t; @@ -99, +84 @@ } } - public Tuple getSampledTuple() throws IOException { - - if(prevByte == null || prevByte == recordDel) - // prevByte = null when this is called for the first time, in that case bindTo would have already - // called getNext() if it was required. - return getNext(); - - else{ // We are in middle of record. So, we skip this and return the next one. - getNext(); - return getNext(); - } - } - public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException { this.in = in; this.end = end; @@ -123, +95 @@ } } - OutputStream mOut; - public void bindTo(OutputStream os) throws IOException { - mOut = os; - } - - private void putField(Object field) throws IOException { - //string constants for each delimiter - String tupleBeginDelim = "("; - String tupleEndDelim = ")"; - String bagBeginDelim = "{"; - String bagEndDelim = "}"; - String mapBeginDelim = "["; - String mapEndDelim = "]"; - String fieldDelim = ","; - String mapKeyValueDelim = "#"; - - switch (DataType.findType(field)) { - case DataType.NULL: - break; // just leave it empty - - case DataType.BOOLEAN: - mOut.write(((Boolean)field).toString().getBytes()); - break; - - case DataType.INTEGER: - mOut.write(((Integer)field).toString().getBytes()); - break; - - case DataType.LONG: - mOut.write(((Long)field).toString().getBytes()); - break; - - case DataType.FLOAT: - mOut.write(((Float)field).toString().getBytes()); - break; - - case DataType.DOUBLE: - mOut.write(((Double)field).toString().getBytes()); - break; - - case DataType.BYTEARRAY: { - byte[] b = ((DataByteArray)field).get(); - mOut.write(b, 0, b.length); - break; - } - - case DataType.CHARARRAY: - // oddly enough, writeBytes writes a string - mOut.write(((String)field).getBytes(UTF8)); - break; - - case DataType.MAP: - boolean mapHasNext = false; - Map<String, Object> m = (Map<String, Object>)field; - mOut.write(mapBeginDelim.getBytes(UTF8)); - for(String s: m.keySet()) { - if(mapHasNext) { - mOut.write(fieldDelim.getBytes(UTF8)); - } else { - mapHasNext = true; - } - putField(s); - mOut.write(mapKeyValueDelim.getBytes(UTF8)); - putField(m.get(s)); - } - mOut.write(mapEndDelim.getBytes(UTF8)); - break; - - case DataType.TUPLE: - boolean tupleHasNext = false; - Tuple t = (Tuple)field; - mOut.write(tupleBeginDelim.getBytes(UTF8)); - for(int i = 0; i < t.size(); ++i) { - if(tupleHasNext) { - mOut.write(fieldDelim.getBytes(UTF8)); - } else { - tupleHasNext = true; - } - try { - putField(t.get(i)); - } catch (ExecException ee) { - throw ee; - } - } - mOut.write(tupleEndDelim.getBytes(UTF8)); - break; - - case DataType.BAG: - boolean bagHasNext = false; - mOut.write(bagBeginDelim.getBytes(UTF8)); - Iterator<Tuple> tupleIter = ((DataBag)field).iterator(); - while(tupleIter.hasNext()) { - if(bagHasNext) { - mOut.write(fieldDelim.getBytes(UTF8)); - } else { - bagHasNext = true; - } - putField((Object)tupleIter.next()); - } - mOut.write(bagEndDelim.getBytes(UTF8)); - break; - - default: { - int errCode = 2108; - String msg = "Could not determine data type of field: " + field; - throw new ExecException(msg, errCode, PigException.BUG); - } - - } - } - - public void putNext(Tuple f) throws IOException { - // I have to convert integer fields to string, and then to bytes. - // If I use a DataOutputStream to convert directly from integer to - // bytes, I don't get a string representation. - int sz = f.size(); - for (int i = 0; i < sz; i++) { - Object field; - try { - field = f.get(i); - } catch (ExecException ee) { - throw ee; - } - - putField(field); - - if (i == sz - 1) { - // last field in tuple. - mOut.write(recordDel); - } else { - mOut.write(fieldDel); - } - } - } - - public void finish() throws IOException { - } - private void readField() { if (mProtoTuple == null) mProtoTuple = new ArrayList<Object>(); if (mBuf.size() == 0) { // NULL value mProtoTuple.add(null); } else { + - // TODO, once this can take schemas, we need to figure out - // if the user requested this to be viewed as a certain - // type, and if so, then construct it appropriately. byte[] array = mBuf.toByteArray(); - if (array[array.length-1]=='\r' && os==OS_WINDOWS) { - // This is a java 1.6 function. Until pig officially moves to - // 1.6 we can't use this. - // array = Arrays.copyOf(array, array.length-1); - byte[] tmp = new byte[array.length - 1]; - for (int i = 0; i < array.length - 1; i++) tmp[i] = array[i]; - array = tmp; - } - if (array.length==0) mProtoTuple.add(null); else @@ -288, +111 @@ mBuf.reset(); } - /* (non-Javadoc) - * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage) - */ public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException { - // TODO Auto-generated method stub return null; } @@ -301, +120 @@ // do nothing } + public RequiredFieldResponse fieldsToRead(RequiredFieldList requiredFieldList) + throws FrontendException { + // indicate to pig that this loader will return all fields and not just + // the required fields passed in the argument. + return new RequiredFieldResponse(false); - public boolean equals(Object obj) { - return equals((PigStorage)obj); - } - - public boolean equals(PigStorage other) { - return this.fieldDel == other.fieldDel; - } - - /* (non-Javadoc) - * @see org.apache.pig.StoreFunc#getStorePreparationClass() - */ - - public Class getStorePreparationClass() throws IOException { - // TODO Auto-generated method stub - return null; } } }}} == New Implementation == + {{{ + public class PigStorage extends LoadFunc { + protected RecordReader in = null; + private byte fieldDel = '\t'; + private ArrayList<Object> mProtoTuple = null; + private TupleFactory mTupleFactory = TupleFactory.getInstance(); + private static final int BUFFER_SIZE = 1024; + + public PigStorage() { + } + + /** + * Constructs a Pig loader that uses specified character as a field delimiter. + * + * @param delimiter + * the single byte character that is used to separate fields. + * ("\t" is the default.) + */ + public PigStorage(String delimiter) { + this(); + if (delimiter.length() == 1) { + this.fieldDel = (byte)delimiter.charAt(0); + } else if (delimiter.length() > 1 && delimiter.charAt(0) == '\\') { + switch (delimiter.charAt(1)) { + case 't': + this.fieldDel = (byte)'\t'; + break; + case 'x': + case 'u': + this.fieldDel = + Integer.valueOf(delimiter.substring(2)).byteValue(); + break; + + default: + throw new RuntimeException("Unknown delimiter " + delimiter); + } + } else { + throw new RuntimeException("PigStorage delimeter must be a single character"); + } + } + + @Override + public Tuple getNext() throws IOException { + try { + boolean notDone = in.nextKeyValue(); + if (!notDone) { + return null; + } + Text value = (Text) in.getCurrentValue(); + byte[] buf = value.getBytes(); + int len = value.getLength(); + int start = 0; + + for (int i = 0; i < len; i++) { + if (buf[i] == fieldDel) { + readField(buf, start, i); + start = i + 1; + } + } + // pick up the last field + readField(buf, start, len); + + Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple); + mProtoTuple = null; + return t; + } catch (InterruptedException e) { + int errCode = 6018; + String errMsg = "Error while reading input"; + throw new ExecException(errMsg, errCode, + PigException.REMOTE_ENVIRONMENT, e); + } + + } + + private void readField(byte[] buf, int start, int end) { + if (mProtoTuple == null) { + mProtoTuple = new ArrayList<Object>(); + } + + if (start == end) { + // NULL value + mProtoTuple.add(null); + } else { + mProtoTuple.add(new DataByteArray(buf, start, end)); + } + } + + @Override + public InputFormat getInputFormat() { + return new TextInputFormat(); + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit split) { + in = reader; + } + + @Override + public void setLocation(String location, Job job) + throws IOException { + FileInputFormat.setInputPaths(job, location); + } + } + + }}} +
