Author: gates Date: Tue Jun 3 16:48:02 2008 New Revision: 662921 URL: http://svn.apache.org/viewvc?rev=662921&view=rev Log: PIG-160 Pradeep's patch to implement LoadFunc changes in the built in loader functions.
Added: incubator/pig/branches/types/src/org/apache/pig/builtin/Utf8StorageConverter.java incubator/pig/branches/types/test/org/apache/pig/test/TestConversions.java Modified: incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java Modified: incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java?rev=662921&r1=662920&r2=662921&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java Tue Jun 3 16:48:02 2008 @@ -71,13 +71,6 @@ */ public Tuple getNext() throws IOException; - /** - * Cast data from bytes to boolean value. - * @param bytes byte array to be cast. - * @return Boolean value. - * @throws IOException if the value cannot be cast. - */ - public Boolean bytesToBoolean(byte[] b) throws IOException; /** * Cast data from bytes to integer value. Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=662921&r1=662920&r2=662921&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Tue Jun 3 16:48:02 2008 @@ -18,6 +18,7 @@ package org.apache.pig.builtin; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -111,48 +112,91 @@ } public DataBag bytesToBag(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Boolean bytesToBoolean(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (DataBag)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public String bytesToCharArray(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (String)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public Double bytesToDouble(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (Double)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public Float bytesToFloat(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (Float)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public Integer bytesToInteger(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (Integer)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public Long bytesToLong(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (Long)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public Map<Object, Object> bytesToMap(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (Map<Object, Object>)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public Tuple bytesToTuple(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b)); + try { + return (Tuple)DataReaderWriter.readDatum(dis); + } catch (ExecException ee) { + IOException oughtToBeEE = new IOException(); + ee.initCause(ee); + throw oughtToBeEE; + } } public Schema determineSchema(URL fileName) throws IOException { Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=662921&r1=662920&r2=662921&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue Jun 3 16:48:02 2008 @@ -27,7 +27,6 @@ import org.apache.pig.LoadFunc; import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; @@ -41,7 +40,8 @@ * delimiter is given as a regular expression. See String.split(delimiter) and * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information. */ -public class PigStorage implements LoadFunc, StoreFunc { +public class PigStorage extends Utf8StorageConverter + implements LoadFunc, StoreFunc { protected BufferedPositionedInputStream in = null; long end = Long.MAX_VALUE; @@ -49,7 +49,6 @@ private byte fieldDel = '\t'; private ByteArrayOutputStream mBuf = null; private ArrayList<Object> mProtoTuple = null; - private TupleFactory mTupleFactory = TupleFactory.getInstance(); public PigStorage() { } @@ -195,59 +194,12 @@ mBuf.reset(); } - public DataBag bytesToBag(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Boolean bytesToBoolean(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public String bytesToCharArray(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Double bytesToDouble(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Float bytesToFloat(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Integer bytesToInteger(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Long bytesToLong(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Map<Object, Object> bytesToMap(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - - public Tuple bytesToTuple(byte[] b) throws IOException { - // TODO Auto-generated method stub - return null; - } - public Schema determineSchema(URL fileName) throws IOException { - // TODO Auto-generated method stub return null; } public void fieldsToRead(Schema schema) { - // TODO Auto-generated method stub - + // do nothing } } Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java?rev=662921&r1=662920&r2=662921&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java Tue Jun 3 16:48:02 2008 @@ -46,7 +46,7 @@ this.in = in; this.end = end; // Since we are not block aligned we throw away the first - // record and cound on a different instance to read it + // record and count on a different instance to read it if (offset != 0) getNext(); } @@ -71,6 +71,7 @@ /** * TextLoader does not support conversion to Integer + * @throws IOException if the value cannot be cast. */ public Integer bytesToInteger(byte[] b) throws IOException { throw new IOException("TextLoader does not support conversion to Integer"); @@ -78,6 +79,7 @@ /** * TextLoader does not support conversion to Long + * @throws IOException if the value cannot be cast. */ public Long bytesToLong(byte[] b) throws IOException { throw new IOException("TextLoader does not support conversion to Long"); @@ -85,6 +87,7 @@ /** * TextLoader does not support conversion to Float + * @throws IOException if the value cannot be cast. */ public Float bytesToFloat(byte[] b) throws IOException { throw new IOException("TextLoader does not support conversion to Float"); @@ -92,6 +95,7 @@ /** * TextLoader does not support conversion to Double + * @throws IOException if the value cannot be cast. */ public Double bytesToDouble(byte[] b) throws IOException { throw new IOException("TextLoader does not support conversion to Double"); @@ -109,6 +113,7 @@ /** * TextLoader does not support conversion to Map + * @throws IOException if the value cannot be cast. */ public Map<Object, Object> bytesToMap(byte[] b) throws IOException { throw new IOException("TextLoader does not support conversion to Map"); @@ -116,6 +121,7 @@ /** * TextLoader does not support conversion to Tuple + * @throws IOException if the value cannot be cast. */ public Tuple bytesToTuple(byte[] b) throws IOException { throw new IOException("TextLoader does not support conversion to Tuple"); @@ -123,6 +129,7 @@ /** * TextLoader does not support conversion to Bag + * @throws IOException if the value cannot be cast. */ public DataBag bytesToBag(byte[] b) throws IOException { throw new IOException("TextLoader does not support conversion to Bag"); Added: incubator/pig/branches/types/src/org/apache/pig/builtin/Utf8StorageConverter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=662921&view=auto ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/builtin/Utf8StorageConverter.java (added) +++ incubator/pig/branches/types/src/org/apache/pig/builtin/Utf8StorageConverter.java Tue Jun 3 16:48:02 2008 @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.builtin; + +import java.awt.image.VolatileImage; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; +import org.apache.hadoop.io.DataOutputBuffer; + +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.data.BagFactory; + + +/** + * This abstract class provides standard conversions between utf8 encoded data + * and pig data types. It is intended to be extended by load and store + * functions (such as PigStorage). + */ +abstract public class Utf8StorageConverter { + + protected BagFactory mBagFactory = BagFactory.getInstance(); + protected TupleFactory mTupleFactory = TupleFactory.getInstance(); + protected final Log mLog = LogFactory.getLog(getClass()); + + public Utf8StorageConverter() { + } + + public DataBag bytesToBag(byte[] b) throws IOException { + //TODO:FIXME + return null; + } + + public String bytesToCharArray(byte[] b) throws IOException { + return new String(b); + } + + public Double bytesToDouble(byte[] b) throws IOException { + try { + return Double.valueOf(new String(b)); + } catch (NumberFormatException nfe) { + mLog.warn("Unable to interpret value " + b + " in field being " + + "converted to double, caught NumberFormatException <" + + nfe.getMessage() + "> field discarded"); + return null; + } + } + + public Float bytesToFloat(byte[] b) throws IOException { + try { + return Float.valueOf(new String(b)); + } catch (NumberFormatException nfe) { + mLog.warn("Unable to interpret value " + b + " in field being " + + "converted to float, caught NumberFormatException <" + + nfe.getMessage() + "> field discarded"); + return null; + } + } + + public Integer bytesToInteger(byte[] b) throws IOException { + try { + return Integer.valueOf(new String(b)); + } catch (NumberFormatException nfe) { + mLog.warn("Unable to interpret value " + b + " in field being " + + "converted to int, caught NumberFormatException <" + + nfe.getMessage() + "> field discarded"); + return null; + } + } + + public Long bytesToLong(byte[] b) throws IOException { + try { + return Long.valueOf(new String(b)); + } catch (NumberFormatException nfe) { + mLog.warn("Unable to interpret value " + b + " in field being " + + "converted to long, caught NumberFormatException <" + + nfe.getMessage() + "> field discarded"); + return null; + } + } + + public Map<Object, Object> bytesToMap(byte[] b) throws IOException { + //TODO:FIXME + return null; + } + + public Tuple bytesToTuple(byte[] b) throws IOException { + return bytesToTuple(b, 0, b.length - 1); + } + + private Tuple bytesToTuple(byte[] b, int start, int end) throws IOException { + //TODO:FIXME + return null; + } + + + public byte[] toBytes(DataBag bag) throws IOException { + //TODO:FIXME + throw new IOException("Conversion from Bag to bytes not supported"); + } + + public byte[] toBytes(String s) throws IOException { + return s.getBytes(); + } + + public byte[] toBytes(Double d) throws IOException { + return d.toString().getBytes(); + } + + public byte[] toBytes(Float f) throws IOException { + return f.toString().getBytes(); + } + + public byte[] toBytes(Integer i) throws IOException { + return i.toString().getBytes(); + } + + public byte[] toBytes(Long l) throws IOException { + return l.toString().getBytes(); + } + + public byte[] toBytes(Map<Object, Object> m) throws IOException { + //TODO:FIXME + throw new IOException("Conversion from Map to bytes not supported"); + } + + public byte[] toBytes(Tuple t) throws IOException { + //TODO:FIXME + throw new IOException("Conversion from Tuple to bytes not supported"); + } + + +} Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java?rev=662921&r1=662920&r2=662921&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java Tue Jun 3 16:48:02 2008 @@ -47,6 +47,20 @@ } /** + * Construct a byte array using a portion of the provided bytes as content. + * @param b byte array to read from. A copy of the underlying bytes will be + * made. + * @param start starting point to copy from + * @param ending point to copy to, exclusive. + */ + public DataByteArray(byte[] b, int start, int end) { + mData = new byte[end - start]; + for (int i = start; i < end; i++) { + mData[i - start] = b[i]; + } + } + + /** * Construct a byte array from a String. The contents of the string * are copied. * @param s String to make a byte array out of. Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java?rev=662921&r1=662920&r2=662921&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java Tue Jun 3 16:48:02 2008 @@ -740,62 +740,5 @@ return res; } - public Result getNext(Boolean b) throws ExecException { - PhysicalOperator in = inputs.get(0); - Byte resultType = in.getResultType(); - switch(resultType) { - - case DataType.MAP : - - case DataType.TUPLE : - - case DataType.BAG : - - case DataType.BYTEARRAY : { - DataByteArray dba = null; - Result res = in.getNext(dba); - if(res.returnStatus == POStatus.STATUS_OK) { - //res.result = new String(((DataByteArray)res.result).toString()); - dba = (DataByteArray) res.result; - try { - res.result = load.bytesToBoolean(dba.get()); - } catch (IOException e) { - log.error("Error while casting from ByteArray to Boolean"); - } - } - return res; - } - - case DataType.INTEGER : - - case DataType.DOUBLE : - - case DataType.LONG : - - case DataType.FLOAT : - - case DataType.CHARARRAY : { - String str = null; - Result res = in.getNext(str); - if(res.returnStatus == POStatus.STATUS_OK) { - if(((String)res.result).length() > 0) - res.result = new Boolean(true); - else res.result = new Boolean(false); - } - return res; - - } - - case DataType.BOOLEAN : { - Result res = in.getNext(b); - return res; - } - } - Result res = new Result(); - res.returnStatus = POStatus.STATUS_ERR; - return res; - } - - } Added: incubator/pig/branches/types/test/org/apache/pig/test/TestConversions.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestConversions.java?rev=662921&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestConversions.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestConversions.java Tue Jun 3 16:48:02 2008 @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.IOException; + +import org.apache.pig.builtin.PigStorage; + +import org.junit.Test; + +import junit.framework.TestCase; + +/** + * Test class to test conversions from bytes to types + * and vice versa + * + */ +public class TestConversions extends TestCase { + + PigStorage ps = new PigStorage(); + + @Test + public void testBytesToInteger() throws IOException + { + // valid ints + String[] a = {"1", "-2345", "1234567"}; + Integer[] ia = {1, -2345, 1234567}; + + for (int i = 0; i < ia.length; i++) { + byte[] b = a[i].getBytes(); + assertEquals(ia[i], ps.bytesToInteger(b)); + } + + // invalid ints + a = new String[]{"1.1", "-23.45", "1234567890123456", "This is an int"}; + for (String s : a) { + byte[] b = s.getBytes(); + Integer i = ps.bytesToInteger(b); + assertEquals(null, i); + } + } + + @Test + public void testBytesToFloat() throws IOException + { + // valid floats + String[] a = {"1", "-2.345", "12.12334567", "1.02e-2",".23344", "23.1234567897"}; + Float[] f = {1f, -2.345f, 12.12334567f, 1.02e-2f,.23344f, 23.1234567f}; // last case is a truncation case + for (int j = 0; j < f.length; j++) { + byte[] b = a[j].getBytes(); + assertEquals(f[j], ps.bytesToFloat(b)); + } + + // invalid floats + a = new String[]{"1a.1", "23.1234567a890123456", "This is a float"}; + for (String s : a) { + byte[] b = s.getBytes(); + Float fl = ps.bytesToFloat(b); + assertEquals(null, fl); + + } + } + + @Test + public void testBytesToDouble() throws IOException + { + // valid doubles + String[] a = {"1", "-2.345", "12.12334567890123456", "1.02e12","-.23344"}; + Double[] d = {(double)1, -2.345, 12.12334567890123456, 1.02e12, -.23344}; + for (int j = 0; j < d.length; j++) { + byte[] b = a[j].getBytes(); + assertEquals(d[j], ps.bytesToDouble(b)); + } + + // invalid doubles + a = new String[]{"-0x1.1", "-23a.45", "This is a double"}; + for (String s : a) { + byte[] b = s.getBytes(); + Double dl = ps.bytesToDouble(b); + assertEquals(null, dl); + + } + } + + @Test + public void testBytesToLong() throws IOException + { + // valid Longs + String[] a = {"1", "-2345", "123456789012345678"}; + Long[] la = {1L, -2345L, 123456789012345678L}; + + for (int i = 0; i < la.length; i++) { + byte[] b = a[i].getBytes(); + assertEquals(la[i], ps.bytesToLong(b)); + } + + // invalid longs + a = new String[]{"1.1", "-23.45", "This is a long"}; + for (String s : a) { + byte[] b = s.getBytes(); + Long l = ps.bytesToLong(b); + assertEquals(null, l); + } + } + + @Test + public void testBytesToChar() throws IOException + { + // valid Strings + String[] a = {"1", "-2345", "text", "hello\nworld"}; + + for (String s : a) { + byte[] b = s.getBytes(); + assertEquals(s, ps.bytesToCharArray(b)); + } + } + + + +}