Author: sms
Date: Fri Mar  6 22:50:52 2009
New Revision: 751121

URL: http://svn.apache.org/viewvc?rev=751121&view=rev
Log:
PIG-544: Utf8StorageConverter.java does not always produce NULLs when data is 
malformed(thejas via sms)

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java
      - copied, changed from r751060, 
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Utils.java
Removed:
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Utils.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
    hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Mar  6 22:50:52 2009
@@ -449,3 +449,6 @@
     PIG-577: outer join query looses name information (sms via pradeepkth)
 
     PIG-690: UNION doesn't work in the latest code (pradeepkth via sms)
+
+    PIG-544: Utf8StorageConverter.java does not always produce NULLs when data
+    is malformed(thejas via sms)

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Fri Mar  6 22:50:52 2009
@@ -40,7 +40,7 @@
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.grunt.PigCompletor;
-import org.apache.pig.tools.grunt.Utils;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.tools.timer.PerformanceTimerFactory;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 
@@ -370,12 +370,12 @@
             rc = 2;
         }
         if(!gruntCalled) {
-               Utils.writeLog(pe, logFileName, log, verbose);
+               LogUtils.writeLog(pe, logFileName, log, verbose);
         }
     } catch (Throwable e) {
         rc = 2;
         if(!gruntCalled) {
-               Utils.writeLog(e, logFileName, log, verbose);
+               LogUtils.writeLog(e, logFileName, log, verbose);
         }
     } finally {
         // clear temp files

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Fri Mar  6 22:50:52 2009
@@ -67,7 +67,7 @@
 import org.apache.pig.impl.logicalLayer.LODefine;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.pen.ExampleGenerator;
-import org.apache.pig.tools.grunt.Utils;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.tools.grunt.GruntParser;
 
 
@@ -311,7 +311,7 @@
                     aliasesMap, opTableMap, aliasOpMap, startLine);
         } catch (ParseException e) {
             //throw (IOException) new IOException(e.getMessage()).initCause(e);
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             int errCode = 1000;
             String msg = "Error during parsing. " + (pe == null? 
e.getMessage() : pe.getMessage());
             throw new FrontendException(msg, errCode, PigException.INPUT, 
false, null, e);

Modified: hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigWarning.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigWarning.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigWarning.java Fri Mar  6 22:50:52 2009
@@ -25,7 +25,7 @@
     ACCESSING_NON_EXISTENT_FIELD,
     DID_NOT_FIND_LOAD_ONLY_MAP_PLAN,
     DIVIDE_BY_ZERO,
-    FIELD_DISCARDED,
+    FIELD_DISCARDED_TYPE_CONVERSION_FAILED,
     GROUP_BY_INCOMPATIBLE_TYPES,
     IMPLICIT_CAST_TO_BAG,
     IMPLICIT_CAST_TO_CHARARRAY,

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
 Fri Mar  6 22:50:52 2009
@@ -47,7 +47,7 @@
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.tools.grunt.Utils;
+import org.apache.pig.impl.util.LogUtils;
 
 public abstract class Launcher {
     private static final Log log = LogFactory.getLog(Launcher.class);
@@ -190,7 +190,7 @@
                 if(exceptions.size() > 1) {
                     for(int j = 0; j < exceptions.size(); ++j) {
                         String headerMessage = "Error message from task (" + 
type + ") " + reports[i].getTaskID();
-                        Utils.writeLog(exceptions.get(j), 
pigContext.getProperties().getProperty("pig.logfile"), log, false, 
headerMessage, false, false);
+                        LogUtils.writeLog(exceptions.get(j), 
pigContext.getProperties().getProperty("pig.logfile"), log, false, 
headerMessage, false, false);
                     }
                     throw exceptions.get(0);
                 } else if(exceptions.size() == 1) {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Fri Mar  6 
22:50:52 2009
@@ -29,8 +29,11 @@
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.ReversibleLoadStoreFunc;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -43,6 +46,7 @@
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.WrappedIOException;
 
 
@@ -53,6 +57,7 @@
 
     Iterator<Tuple>     i              = null;
     protected BufferedPositionedInputStream in = null;
+    private static final Log mLog = LogFactory.getLog(BinStorage.class);
     private DataInputStream inData = null;
     protected long                end            = Long.MAX_VALUE;
     
@@ -125,75 +130,115 @@
         t.write(out);
     }
 
-    public DataBag bytesToBag(byte[] b) throws IOException {
+    public DataBag bytesToBag(byte[] b){
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (DataBag)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return DataReaderWriter.bytesToBag(dis);
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to bag, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+        
+            return null;
         }        
     }
 
-    public String bytesToCharArray(byte[] b) throws IOException {
+    public String bytesToCharArray(byte[] b) {
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (String)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return DataReaderWriter.bytesToCharArray(dis);
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to chararray, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+        
+            return null;
         }
     }
 
-    public Double bytesToDouble(byte[] b) throws IOException {
+    public Double bytesToDouble(byte[] b) {
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (Double)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return new Double(dis.readDouble());
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to double, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+        
+            return null;
         }
     }
 
-    public Float bytesToFloat(byte[] b) throws IOException {
+    public Float bytesToFloat(byte[] b) {
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (Float)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return new Float(dis.readFloat());
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to float, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+            
+            return null;
         }
     }
 
-    public Integer bytesToInteger(byte[] b) throws IOException {
+    public Integer bytesToInteger(byte[] b) {
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (Integer)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return new Integer(dis.readInt());
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to integer, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+        
+            return null;
         }
     }
 
-    public Long bytesToLong(byte[] b) throws IOException {
+    public Long bytesToLong(byte[] b) {
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (Long)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return new Long(dis.readLong());
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to long, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+        
+            return null;
         }
     }
 
-    public Map<Object, Object> bytesToMap(byte[] b) throws IOException {
+    public Map<Object, Object> bytesToMap(byte[] b) {
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (Map<Object, Object>)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return DataReaderWriter.bytesToMap(dis);
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to map, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+        
+            return null;
         }
     }
 
-    public Tuple bytesToTuple(byte[] b) throws IOException {
+    public Tuple bytesToTuple(byte[] b) {
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b));
         try {
-            return (Tuple)DataReaderWriter.readDatum(dis);
-        } catch (ExecException ee) {
-            throw ee;
+            return DataReaderWriter.bytesToTuple(dis);
+        } catch (IOException e) {
+            LogUtils.warn(this, "Unable to convert bytearray to tuple, " +
+                    "caught IOException <" + e.getMessage() + ">",
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
+                    mLog);
+        
+            return null;
         }
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Utf8StorageConverter.java Fri 
Mar  6 22:50:52 2009
@@ -36,6 +36,7 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.parser.ParseException;
 import org.apache.pig.data.parser.TextDataParser;
+import org.apache.pig.impl.util.LogUtils;
 
 /**
  * This abstract class provides standard conversions between utf8 encoded data
@@ -70,15 +71,24 @@
     public DataBag bytesToBag(byte[] b) throws IOException {
         if(b == null)
             return null;
-        Object o;
+        DataBag db;
         try {
-            o = parseFromBytes(b);
+            db = (DataBag)parseFromBytes(b);
         } catch (ParseException pe) {
-            int errCode = 2110;
-            String msg = "Could not convert bytearray to bag.";
-            throw new ExecException(msg, errCode, PigException.BUG);
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
+                    "converted to type bag, caught ParseException <" +
+                    pe.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
+            return null;       
+        }catch (Exception e){
+            // can happen if parseFromBytes identifies it as being of 
different type
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
+                    "converted to type bag, caught Exception <" +
+                    e.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
+            return null;       
         }
-        return (DataBag)o;
+        return (DataBag)db;
     }
 
     public String bytesToCharArray(byte[] b) throws IOException {
@@ -87,15 +97,16 @@
         return new String(b, "UTF-8");
     }
 
-    public Double bytesToDouble(byte[] b) throws IOException {
+    public Double bytesToDouble(byte[] b) {
         if(b == null)
             return null;
         try {
             return Double.valueOf(new String(b));
         } catch (NumberFormatException nfe) {
-            warn("Unable to interpret value " + b + " in field being " +
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
                     "converted to double, caught NumberFormatException <" +
-                    nfe.getMessage() + "> field discarded", 
PigWarning.FIELD_DISCARDED);
+                    nfe.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
             return null;
         }
     }
@@ -115,9 +126,10 @@
         try {
             return Float.valueOf(s);
         } catch (NumberFormatException nfe) {
-            warn("Unable to interpret value " + b + " in field being " +
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
                     "converted to float, caught NumberFormatException <" +
-                    nfe.getMessage() + "> field discarded", 
PigWarning.FIELD_DISCARDED);
+                    nfe.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
             return null;
         }
     }
@@ -137,14 +149,16 @@
                 Double d = Double.valueOf(s);
                 // Need to check for an overflow error
                 if (d.doubleValue() > mMaxInt.doubleValue() + 1.0) {
-                    warn("Value " + d + " too large for integer", 
PigWarning.TOO_LARGE_FOR_INT);
+                    LogUtils.warn(this, "Value " + d + " too large for 
integer", 
+                                PigWarning.TOO_LARGE_FOR_INT, mLog);
                     return null;
                 }
                 return new Integer(d.intValue());
             } catch (NumberFormatException nfe2) {
-                warn("Unable to interpret value " + b + " in field being " +
+                LogUtils.warn(this, "Unable to interpret value " + b + " in 
field being " +
                         "converted to int, caught NumberFormatException <" +
-                        nfe.getMessage() + "> field discarded", 
PigWarning.FIELD_DISCARDED);
+                        nfe.getMessage() + "> field discarded", 
+                        PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
mLog);
                 return null;
             }
         }
@@ -174,14 +188,16 @@
                 Double d = Double.valueOf(s);
                 // Need to check for an overflow error
                 if (d.doubleValue() > mMaxLong.doubleValue() + 1.0) {
-                       warn("Value " + d + " too large for integer", 
PigWarning.TOO_LARGE_FOR_INT);
+                       LogUtils.warn(this, "Value " + d + " too large for 
integer", 
+                                   PigWarning.TOO_LARGE_FOR_INT, mLog);
                     return null;
                 }
                 return new Long(d.longValue());
             } catch (NumberFormatException nfe2) {
-                warn("Unable to interpret value " + b + " in field being " +
-                        "converted to long, caught NumberFormatException <" +
-                        nfe.getMessage() + "> field discarded", 
PigWarning.FIELD_DISCARDED);
+                LogUtils.warn(this, "Unable to interpret value " + b + " in 
field being " +
+                            "converted to long, caught NumberFormatException 
<" +
+                            nfe.getMessage() + "> field discarded", 
+                            PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, 
mLog);
                 return null;
             }
         }
@@ -190,29 +206,49 @@
     public Map<Object, Object> bytesToMap(byte[] b) throws IOException {
         if(b == null)
             return null;
-        Object o;
+        Map<Object, Object> map;
         try {
-            o = parseFromBytes(b);
-        } catch (ParseException pe) {
-            int errCode = 2110;
-            String msg = "Could not convert bytearray to map.";
-            throw new ExecException(msg, errCode, PigException.BUG);
+            map = (Map<Object, Object>)parseFromBytes(b);
+        }
+        catch (ParseException pe) {
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
+                    "converted to type map, caught ParseException <" +
+                    pe.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
+            return null;       
+        }catch (Exception e){
+            // can happen if parseFromBytes identifies it as being of 
different type
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
+                    "converted to type map, caught Exception <" +
+                    e.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
+            return null;       
         }
-        return (Map<Object, Object>)o;
+        return map;
     }
 
     public Tuple bytesToTuple(byte[] b) throws IOException {
         if(b == null)
             return null;
-        Object o;
+        Tuple t;
         try {
-            o = parseFromBytes(b);
-        } catch (ParseException pe) {
-            int errCode = 2110;
-            String msg = "Could not convert bytearray to tuple.";
-            throw new ExecException(msg, errCode, PigException.BUG);
+            t = (Tuple)parseFromBytes(b);
+        } 
+        catch (ParseException pe) {
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
+                    "converted to type tuple, caught ParseException <" +
+                    pe.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
+            return null;       
+        }catch (Exception e){
+            // can happen if parseFromBytes identifies it as being of 
different type
+            LogUtils.warn(this, "Unable to interpret value " + b + " in field 
being " +
+                    "converted to type tuple, caught Exception <" +
+                    e.getMessage() + "> field discarded", 
+                    PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
+            return null;       
         }
-        return (Tuple)o;
+        return t;
     }
 
 
@@ -248,13 +284,6 @@
         return t.toString().getBytes();
     }
     
-    protected void warn(String msg, Enum warningEnum) {
-       pigLogger = PhysicalOperator.getPigLogger();
-       if(pigLogger != null) {
-               pigLogger.warn(this, msg, warningEnum);
-       } else {
-               mLog.warn(msg);
-       }       
-    }
+
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Fri Mar  6 
22:50:52 2009
@@ -38,46 +38,69 @@
     static final int UNSIGNED_SHORT_MAX = 65535;
     static final String UTF8 = "UTF-8";
 
+    public static Tuple bytesToTuple(DataInput in) throws IOException {
+        // Don't use Tuple.readFields, because it requires you to
+        // create a tuple with no size and then append fields.
+        // That's less efficient than allocating the tuple size up
+        // front and then filling in the spaces.
+        // Read the size.
+        int sz = in.readInt();
+        // if sz == 0, we construct an "empty" tuple -
+        // presumably the writer wrote an empty tuple!
+        if (sz < 0) {
+            throw new IOException("Invalid size " + sz + " for a tuple");
+        }
+        Tuple t = mTupleFactory.newTuple(sz);
+        for (int i = 0; i < sz; i++) {
+            t.set(i, readDatum(in));
+        }
+        return t;
+
+    }
+    
+    public static DataBag bytesToBag(DataInput in) throws IOException {
+        DataBag bag = mBagFactory.newDefaultBag();
+        bag.readFields(in);
+        return bag;
+    }
+    
+    public static Map<Object, Object> bytesToMap(DataInput in) throws 
IOException {
+        int size = in.readInt();    
+        Map<Object, Object> m = new HashMap<Object, Object>(size);
+        for (int i = 0; i < size; i++) {
+            Object key = readDatum(in);
+            m.put(key, readDatum(in));
+        }
+        return m;    
+    }
+    
+    public static String bytesToCharArray(DataInput in) throws IOException{
+        int size = in.readUnsignedShort();
+        byte[] ba = new byte[size];
+        in.readFully(ba);
+        return new String(ba, DataReaderWriter.UTF8);
+    }
+
+    public static String bytesToBigCharArray(DataInput in) throws IOException{
+        int size = in.readInt();
+        byte[] ba = new byte[size];
+        in.readFully(ba);
+        return new String(ba, DataReaderWriter.UTF8);
+    }
+    
+        
     public static Object readDatum(DataInput in) throws IOException, 
ExecException {
         // Read the data type
         byte b = in.readByte();
         switch (b) {
-            case DataType.TUPLE: {
-                
-                // Don't use Tuple.readFields, because it requires you to
-                // create a tuple with no size and then append fields.
-                // That's less efficient than allocating the tuple size up
-                // front and then filling in the spaces.
-                // Read the size.
-                int sz = in.readInt();
-                // if sz == 0, we construct an "empty" tuple -
-                // presumably the writer wrote an empty tuple!
-                if (sz < 0) {
-                    throw new IOException("Invalid size " + sz +
-                        " for a tuple");
-                }
-                Tuple t = mTupleFactory.newTuple(sz);
-                for (int i = 0; i < sz; i++) {
-                    t.set(i, readDatum(in));
-                }
-                return t;
-                                 }
-
-            case DataType.BAG: {
-                DataBag bag = mBagFactory.newDefaultBag();
-                bag.readFields(in);
-                return bag;
-                               }
+            case DataType.TUPLE: 
+                return bytesToTuple(in);
+            
+            case DataType.BAG: 
+                return bytesToBag(in);
 
-            case DataType.MAP: {
-                int size = in.readInt();
-                Map<Object, Object> m = new HashMap<Object, Object>(size);
-                for (int i = 0; i < size; i++) {
-                    Object key = readDatum(in);
-                    m.put(key, readDatum(in));
-                }
-                return m;
-                               }
+            case DataType.MAP: 
+                return bytesToMap(in);    
 
             case DataType.INTEGER:
                 return new Integer(in.readInt());
@@ -104,21 +127,13 @@
                 return new DataByteArray(ba);
                                      }
 
-            case DataType.BIGCHARARRAY: {
-                int size = in.readInt();
-                byte[] ba = new byte[size];
-                in.readFully(ba);
-               return new String(ba, DataReaderWriter.UTF8);
-            }
-
-            case DataType.CHARARRAY: {
-                int size = in.readUnsignedShort();
-                byte[] ba = new byte[size];
-                in.readFully(ba);
-               return new String(ba, DataReaderWriter.UTF8);
-            }
-
-
+            case DataType.BIGCHARARRAY: 
+                return bytesToBigCharArray(in);
+            
+
+            case DataType.CHARARRAY: 
+                return bytesToCharArray(in);
+            
             case DataType.NULL:
                 return null;
 

Copied: hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java (from 
r751060, hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Utils.java)
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java?p2=hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java&p1=hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Utils.java&r1=751060&r2=751121&rev=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Utils.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/LogUtils.java Fri Mar  6 
22:50:52 2009
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.tools.grunt;
+package org.apache.pig.impl.util;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -24,12 +24,28 @@
 import java.io.PrintStream;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigscript.parser.ParseException;
 
-public class Utils {
-    static Exception getPermissionException(Exception top){
+public class LogUtils {
+    
+    public static void warn(Object o, String msg, PigWarning warningEnum, 
+                Log log) {
+        
+        PigLogger pigLogger = PhysicalOperator.getPigLogger();
+        if(pigLogger != null) {
+            pigLogger.warn(o, msg, warningEnum);
+        } else {
+            log.warn(msg); 
+        }           
+    }
+    
+    public static Exception getPermissionException(Exception top){
         Throwable current = top;
 
         while (current != null && (current.getMessage() == null || 
current.getMessage().indexOf("Permission denied") == -1)){
@@ -62,13 +78,13 @@
         String message = null;
         
         if(t instanceof Exception) {
-            Exception pe = Utils.getPermissionException((Exception)t);
+            Exception pe = LogUtils.getPermissionException((Exception)t);
             if (pe != null) {
                 log.error("You don't have permission to perform the operation. 
Error from the server: " + pe.getMessage());
             }
         }
 
-        PigException pigException = Utils.getPigException(t);
+        PigException pigException = LogUtils.getPigException(t);
 
         if(pigException != null) {
             message = "ERROR " + pigException.getErrorCode() + ": " + 
pigException.getMessage();

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/Grunt.java Fri Mar  6 
22:50:52 2009
@@ -35,7 +35,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.tools.pigscript.parser.*;
 import org.apache.pig.impl.logicalLayer.parser.TokenMgrError;
-
+import org.apache.pig.impl.util.LogUtils;
 
 public class Grunt 
 {
@@ -69,7 +69,7 @@
                 parser.parseStopOnError();
                 break;                            
             } catch(Throwable t) {
-                Utils.writeLog(t, 
pig.getPigContext().getProperties().getProperty("pig.logfile"), log, verbose);
+                LogUtils.writeLog(t, 
pig.getPigContext().getProperties().getProperty("pig.logfile"), log, verbose);
                 parser.ReInit(in);
             }
         }
@@ -81,7 +81,7 @@
             parser.setInteractive(false);
             parser.parseStopOnError();
         } catch (Throwable t) {
-            Utils.writeLog(t, 
pig.getPigContext().getProperties().getProperty("pig.logfile"), log, verbose);
+            LogUtils.writeLog(t, 
pig.getPigContext().getProperties().getProperty("pig.logfile"), log, verbose);
             throw (t);
         }
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Mar  6 
22:50:52 2009
@@ -61,6 +61,7 @@
 import org.apache.pig.tools.pigscript.parser.PigScriptParser;
 import org.apache.pig.tools.pigscript.parser.PigScriptParserTokenManager;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
+import org.apache.pig.impl.util.LogUtils;
 
 public class GruntParser extends PigScriptParser {
 
@@ -110,7 +111,7 @@
             }
             catch(Exception e)
             {
-                Exception pe = Utils.getPermissionException(e);
+                Exception pe = LogUtils.getPermissionException(e);
                 if (pe != null)
                     log.error("You don't have permission to perform the 
operation. Error from the server: " + pe.getMessage());
                 else {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java Fri Mar  6 
22:50:52 2009
@@ -38,7 +38,7 @@
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.tools.grunt.Utils;
+import org.apache.pig.impl.util.LogUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -222,7 +222,7 @@
             Iterator<Tuple> iter = pigServer.openIterator("B");
         } catch(Exception e) {
             exceptionCaused = true;
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertTrue(msg.contains("Multiple matching functions"));
             assertTrue(msg.contains("{float,double}, {float,long}"));
@@ -307,7 +307,7 @@
             Iterator<Tuple> iter = pigServer.openIterator("B");
         }catch(Exception e) {
             exceptionCaused = true;
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertTrue(msg.contains("Multiple matching functions"));
             assertTrue(msg.contains("({float,double,long}, 
{float,long,double})"));
@@ -328,7 +328,7 @@
             Iterator<Tuple> iter = pigServer.openIterator("B");
         }catch(Exception e) {
             exceptionCaused = true;
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertTrue(msg.contains("Multiple matching functions"));
             assertTrue(msg.contains("({float,double,long}, 
{float,long,double})"));
@@ -349,7 +349,7 @@
             Iterator<Tuple> iter = pigServer.openIterator("B");
         }catch(Exception e) {
             exceptionCaused = true;
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertTrue(msg.contains("Could not infer the matching function"));
         }
@@ -461,7 +461,7 @@
             Iterator<Tuple> iter = pigServer.openIterator("B");
         }catch(Exception e) {
             exceptionCaused = true;
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertTrue(msg.contains("Multiple matching functions"));
             assertTrue(msg.contains("({float,double,long}, 
{float,long,double}"));
@@ -637,7 +637,7 @@
         try{
             pigServer.openIterator("B");
         }catch (Exception e) {
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertEquals(true,msg.contains("as multiple or none of them fit"));
         }
@@ -691,7 +691,7 @@
         try{
             pigServer.openIterator("B");
         }catch (Exception e) {
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertEquals(true,msg.contains("Multiple matching functions"));
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Mar  6 
22:50:52 2009
@@ -21,6 +21,7 @@
 
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.Iterator;
 import java.util.Random;
@@ -33,6 +34,7 @@
 import org.apache.pig.data.*;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.test.utils.Identity;
+import org.apache.pig.builtin.BinStorage;
 
 import junit.framework.TestCase;
 
@@ -91,5 +93,167 @@
         assertEquals(LOOP_COUNT, numIdentity);
 
     }
+    @Test
+    public void testBinStorageByteArrayCastsSimple() throws IOException {
+        // Test for PIG-544 fix
+        // Tries to read data in BinStorage bytearrays as other pig types,
+        // should return null if the conversion fails.
+        // This test case does not use a practical example , it just tests
+        // if the conversion happens when minimum conditions for conversion
+        // such as expected number of bytes are met.
+        String[] input = {
+                    "asdf\t12\t1.1\t231\t234", 
+                    "sa\t1231\t123.4\t12345678\t1234.567",
+                    "asdff\t1232123\t1.45345\t123456789\t123456789.9"
+                    };
+        
+        Util.createInputFile(cluster, "table_bs_ac", input);
+
+        // test with BinStorage
+        pigServer.registerQuery("a = load 'table_bs_ac';");
+        String output = "/pig/out/TestEvalPipeline2_BinStorageByteArrayCasts";
+        pigServer.deleteFile(output);
+        pigServer.store("a", output, BinStorage.class.getName());
+
+        pigServer.registerQuery("b = load '" + output + "' using BinStorage() "
+                + "as (name: int, age: int, gpa: float, lage: long, dgpa: 
double);");
+        
+        Iterator<Tuple> it = pigServer.openIterator("b");
+        
+        Tuple tup=null;
+        
+        // I have separately verified only few of the successful conversions,
+        // assuming the rest are correct.
+        // It is primarily testing if null is being returned when conversions
+        // are expected to fail
+        
+        //tuple 1 
+        tup = it.next();
+
+        
+        //1634952294 is integer whose  binary represtation is same as that of 
"asdf"
+        // other columns are returning null because they have less than num of 
bytes
+        //expected for the corresponding numeric type's binary respresentation.
+        assertTrue( (Integer)tup.get(0) == 1634952294); 
+        assertTrue(tup.get(1) == null);
+        assertTrue(tup.get(2) == null);
+        assertTrue(tup.get(3) == null);
+        assertTrue(tup.get(4) == null);
+        
+        //tuple 2 
+        tup = it.next();
+        assertTrue(tup.get(0) == null);
+        assertTrue( (Integer)tup.get(1) == 825373489);
+        assertTrue( (Float)tup.get(2) == 2.5931501E-9F);
+        assertTrue( (Long)tup.get(3) == 3544952156018063160L);
+        assertTrue( (Double)tup.get(4) == 1.030084341992388E-71);
+        
+        //tuple 3
+        tup = it.next();
+        // when byte array is larger than required num of bytes for given 
number type
+        // it uses the required bytes from beginging of byte array for 
conversion
+        // for example 1634952294 corresponds to first 4 byptes of binary 
string correspnding to
+        // asdff
+        assertTrue((Integer)tup.get(0) == 1634952294);
+        assertTrue( (Integer)tup.get(1) == 825373490);
+        assertTrue( (Float)tup.get(2) == 2.5350009E-9F);
+        assertTrue( (Long)tup.get(3) == 3544952156018063160L);
+        assertTrue( (Double)tup.get(4) == 1.0300843656201408E-71);
+        
+        Util.deleteFile(cluster, "table");
+    }
+    @Test
+    public void testBinStorageByteArrayCastsComplexBag() throws IOException {
+        // Test for PIG-544 fix
+        
+        // Tries to read data in BinStorage bytearrays as other pig bags,
+        // should return null if the conversion fails.
+        
+        String[] input = {
+                "{(asdf)}",
+                "{(2344)}",
+                "{(2344}",
+                "{(323423423423434)}",
+                "{(323423423423434L)}",
+                "{(asdff)}"
+        };
+        
+        Util.createInputFile(cluster, "table_bs_ac_clx", input);
+
+        // test with BinStorage
+        pigServer.registerQuery("a = load 'table_bs_ac_clx' as (f1);");
+        pigServer.registerQuery("b = foreach a generate (bag{tuple(int)})f1;");
+        
+        Iterator<Tuple> it = pigServer.openIterator("b");
+        
+        Tuple tup=null;
+
+        //tuple 1 
+        tup = it.next();
+        assertTrue(tup.get(0) != null);
+        
+        //tuple 2 
+        tup = it.next();
+        assertTrue(tup.get(0) != null);
+        
+        //tuple 3 - malformed
+        tup = it.next();
+        assertTrue(tup.get(0) == null);
+
+        //tuple 4 - integer exceeds size limit
+        tup = it.next();
+        assertTrue(tup.get(0) == null);
+
+        //tuple 5 
+        tup = it.next();
+        assertTrue(tup.get(0) != null);
+
+        //tuple 6
+        tup = it.next();
+        assertTrue(tup.get(0) != null);
+        
+        Util.deleteFile(cluster, "table_bs_ac_clx");
+    }
+    @Test
+    public void testBinStorageByteArrayCastsComplexTuple() throws IOException {
+        // Test for PIG-544 fix
+        
+        // Tries to read data in BinStorage bytearrays as other pig bags,
+        // should return null if the conversion fails.
+        
+        String[] input = {
+                "(123)",
+                "((123)",
+                "(123123123123)",
+                "(asdf)"
+        };
+        
+        Util.createInputFile(cluster, "table_bs_ac_clxt", input);
+
+        // test with BinStorage
+        pigServer.registerQuery("a = load 'table_bs_ac_clxt' as 
(t:tuple(t:tuple(i:int)));");
+        Iterator<Tuple> it = pigServer.openIterator("a");
+        
+        Tuple tup=null;
+
+        //tuple 1 
+        tup = it.next();
+        assertTrue(tup.get(0) != null);
+        
+        //tuple 2 -malformed tuple
+        tup = it.next();
+        assertTrue(tup.get(0) == null);
+        
+        //tuple 3 - integer exceeds size limit
+        tup = it.next();
+        assertTrue(tup.get(0) == null);
+
+        //tuple 5 
+        tup = it.next();
+        assertTrue(tup.get(0) != null);
+
+        Util.deleteFile(cluster, "table_bs_ac_clxt");
+    }
 
+    
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestGrunt.java Fri Mar  6 
22:50:52 2009
@@ -28,7 +28,7 @@
 import org.apache.pig.PigServer;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.grunt.Grunt;
-import org.apache.pig.tools.grunt.Utils;
+import org.apache.pig.impl.util.LogUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStreamReader;
@@ -110,7 +110,7 @@
         try {
             grunt.exec();
         } catch (Exception e) {
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             String msg = (pe == null? e.getMessage(): pe.getMessage());
             assertTrue(msg.contains("Encountered \" \";"));
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=751121&r1=751120&r2=751121&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java 
(original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri 
Mar  6 22:50:52 2009
@@ -53,7 +53,7 @@
 import org.apache.pig.impl.logicalLayer.parser.ParseException ;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.test.utils.Identity;
-import org.apache.pig.tools.grunt.Utils;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.PigException;
 
 
@@ -1966,13 +1966,13 @@
             // log.error(e);
             //System.err.println("IOException Stack trace for query: " + 
query);
             //e.printStackTrace();
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             fail("IOException: " + (pe == null? e.getMessage(): 
pe.getMessage()));
         } catch (Exception e) {
             log.error(e);
             //System.err.println("Exception Stack trace for query: " + query);
             //e.printStackTrace();
-            PigException pe = Utils.getPigException(e);
+            PigException pe = LogUtils.getPigException(e);
             fail(e.getClass().getName() + ": " + (pe == null? e.getMessage(): 
pe.getMessage()) + " -- " + query);
         }
         return null;


Reply via email to