Author: pradeepkth
Date: Thu Jun  4 17:48:16 2009
New Revision: 781810

URL: http://svn.apache.org/viewvc?rev=781810&view=rev
Log:
PIG-796: support conversion from numeric types to chararray (Ashutosh Chauhan 
via pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
    hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPOCast.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=781810&r1=781809&r2=781810&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jun  4 17:48:16 2009
@@ -60,6 +60,9 @@
 
 BUG FIXES
 
+PIG-796: support conversion from numeric types to chararray (Ashutosh Chauhan
+via pradeepkth)
+
 PIG-564: problem with parameter substitution and special charachters (olgan)
 
 PIG-802: PERFORMANCE: not creating bags for ORDER BY (serakesh via olgan)

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=781810&r1=781809&r2=781810&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
 Thu Jun  4 17:48:16 2009
@@ -50,6 +50,7 @@
     transient private LoadFunc load;
     private Log log = LogFactory.getLog(getClass());
     private boolean castNotNeeded = false;
+    private Byte realType = null;
 
     private static final long serialVersionUID = 1L;
 
@@ -98,7 +99,6 @@
     @Override
     public Result getNext(Integer i) throws ExecException {
         PhysicalOperator in = inputs.get(0);
-        Byte castToType = DataType.INTEGER;
         Byte resultType = in.getResultType();
         switch (resultType) {
         case DataType.BAG: {
@@ -117,33 +117,21 @@
             DataByteArray dba = null;
             Result res = in.getNext(dba);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-                // res.result = new
-                // 
Integer(Integer.valueOf((((DataByteArray)res.result).toString())));
-                if (castNotNeeded) {
-                    // we examined the data once before and
-                    // determined that the input is the same
-                    // type as the type we are casting to
-                    // so just send the input out as output
-                    return res;
-                }
                 try {
                     dba = (DataByteArray) res.result;
                 } catch (ClassCastException e) {
-                    // check if the type of res.result is
-                    // same as the type we are trying to cast to
-                    if (DataType.findType(res.result) == castToType) {
-                        // remember this for future calls
-                        castNotNeeded = true;
-                        // just return the output
-                        return res;
-                    } else {
-                        // the input is a differen type
-                        // rethrow the exception
-                        int errCode = 1081;
-                        String msg = "Cannot cast to int. Expected bytearray 
but received: " + DataType.findTypeName(res.result);
-                        throw new ExecException(msg, errCode, 
PigException.INPUT, e);
+                    // res.result is not of type ByteArray. But it can be one 
of the types from which cast is still possible.
+                    if (realType == null)
+                        // Find the type and cache it.
+                        realType = DataType.findType(res.result);
+                    try {
+                        res.result = DataType.toInteger(res.result, realType);
+                    } catch (ClassCastException cce) {
+                        // Type has changed. Need to find type again and try 
casting it again.
+                        realType = DataType.findType(res.result);
+                        res.result = DataType.toInteger(res.result, realType);
                     }
-
+                    return res;
                 }
                 try {
                     if (null != load) {
@@ -232,7 +220,6 @@
     @Override
     public Result getNext(Long l) throws ExecException {
         PhysicalOperator in = inputs.get(0);
-        Byte castToType = DataType.LONG;
         Byte resultType = in.getResultType();
         switch (resultType) {
         case DataType.BAG: {
@@ -257,33 +244,21 @@
             DataByteArray dba = null;
             Result res = in.getNext(dba);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-                if (castNotNeeded) {
-                    // we examined the data once before and
-                    // determined that the input is the same
-                    // type as the type we are casting to
-                    // so just send the input out as output
-                    return res;
-                }
-                // res.result = new
-                // 
Long(Long.valueOf((((DataByteArray)res.result).toString())));
                 try {
                     dba = (DataByteArray) res.result;
                 } catch (ClassCastException e) {
-                    // check if the type of res.result is
-                    // same as the type we are trying to cast to
-                    if (DataType.findType(res.result) == castToType) {
-                        // remember this for future calls
-                        castNotNeeded = true;
-                        // just return the output
-                        return res;
-                    } else {
-                        // the input is a differen type
-                        // rethrow the exception
-                        int errCode = 1081;
-                        String msg = "Cannot cast to long. Expected bytearray 
but received: " + DataType.findTypeName(res.result);
-                        throw new ExecException(msg, errCode, 
PigException.INPUT, e);
+                    // res.result is not of type ByteArray. But it can be one 
of the types from which cast is still possible.
+                    if (realType == null)
+                        // Find the type in first call and cache it.
+                        realType = DataType.findType(res.result);
+                    try {
+                        res.result = DataType.toLong(res.result, realType);
+                    } catch (ClassCastException cce) {
+                        // Type has changed. Need to find type again and try 
casting it again.
+                        realType = DataType.findType(res.result);
+                        res.result = DataType.toLong(res.result, realType);
                     }
-
+                    return res;
                 }
                 try {
                     if (null != load) {
@@ -367,7 +342,6 @@
     @Override
     public Result getNext(Double d) throws ExecException {
         PhysicalOperator in = inputs.get(0);
-        Byte castToType = DataType.DOUBLE;
         Byte resultType = in.getResultType();
         switch (resultType) {
         case DataType.BAG: {
@@ -392,33 +366,21 @@
             DataByteArray dba = null;
             Result res = in.getNext(dba);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-                // res.result = new
-                // 
Double(Double.valueOf((((DataByteArray)res.result).toString())));
-                if (castNotNeeded) {
-                    // we examined the data once before and
-                    // determined that the input is the same
-                    // type as the type we are casting to
-                    // so just send the input out as output
-                    return res;
-                }
                 try {
                     dba = (DataByteArray) res.result;
                 } catch (ClassCastException e) {
-                    // check if the type of res.result is
-                    // same as the type we are trying to cast to
-                    if (DataType.findType(res.result) == castToType) {
-                        // remember this for future calls
-                        castNotNeeded = true;
-                        // just return the output
-                        return res;
-                    } else {
-                        // the input is a differen type
-                        // rethrow the exception
-                        int errCode = 1081;
-                        String msg = "Cannot cast to double. Expected 
bytearray but received: " + DataType.findTypeName(res.result);
-                        throw new ExecException(msg, errCode, 
PigException.INPUT, e);
+                    // res.result is not of type ByteArray. But it can be one 
of the types from which cast is still possible.
+                    if (realType == null)
+                        // Find the type in first call and cache it.
+                        realType = DataType.findType(res.result);
+                    try {
+                        res.result = DataType.toDouble(res.result, realType);
+                    } catch (ClassCastException cce) {
+                        // Type has changed. Need to find type again and try 
casting it again.
+                        realType = DataType.findType(res.result);
+                        res.result = DataType.toDouble(res.result, realType);
                     }
-
+                    return res;
                 }
                 try {
                     if (null != load) {
@@ -501,7 +463,6 @@
     @Override
     public Result getNext(Float f) throws ExecException {
         PhysicalOperator in = inputs.get(0);
-        Byte castToType = DataType.FLOAT;
         Byte resultType = in.getResultType();
         switch (resultType) {
         case DataType.BAG: {
@@ -526,33 +487,21 @@
             DataByteArray dba = null;
             Result res = in.getNext(dba);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-                // res.result = new
-                // 
Float(Float.valueOf((((DataByteArray)res.result).toString())));
-                if (castNotNeeded) {
-                    // we examined the data once before and
-                    // determined that the input is the same
-                    // type as the type we are casting to
-                    // so just send the input out as output
-                    return res;
-                }
                 try {
                     dba = (DataByteArray) res.result;
                 } catch (ClassCastException e) {
-                    // check if the type of res.result is
-                    // same as the type we are trying to cast to
-                    if (DataType.findType(res.result) == castToType) {
-                        // remember this for future calls
-                        castNotNeeded = true;
-                        // just return the output
-                        return res;
-                    } else {
-                        // the input is a differen type
-                        // rethrow the exception
-                        int errCode = 1081;
-                        String msg = "Cannot cast to float. Expected bytearray 
but received: " + DataType.findTypeName(res.result);
-                        throw new ExecException(msg, errCode, 
PigException.INPUT, e);
+                    // res.result is not of type ByteArray. But it can be one 
of the types from which cast is still possible.
+                    if (realType == null)
+                        // Find the type in first call and cache it.
+                        realType = DataType.findType(res.result);
+                    try {
+                        res.result = DataType.toFloat(res.result, realType);
+                    } catch (ClassCastException cce) {
+                        // Type has changed. Need to find type again and try 
casting it again.
+                        realType = DataType.findType(res.result);
+                        res.result = DataType.toFloat(res.result, realType);
                     }
-
+                    return res;
                 }
                 try {
                     if (null != load) {
@@ -637,7 +586,6 @@
     @Override
     public Result getNext(String str) throws ExecException {
         PhysicalOperator in = inputs.get(0);
-        Byte castToType = DataType.CHARARRAY;
         Byte resultType = in.getResultType();
         switch (resultType) {
         case DataType.BAG: {
@@ -662,33 +610,21 @@
             DataByteArray dba = null;
             Result res = in.getNext(dba);
             if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
-                // res.result = new
-                // String(((DataByteArray)res.result).toString());
-                if (castNotNeeded) {
-                    // we examined the data once before and
-                    // determined that the input is the same
-                    // type as the type we are casting to
-                    // so just send the input out as output
-                    return res;
-                }
                 try {
                     dba = (DataByteArray) res.result;
                 } catch (ClassCastException e) {
-                    // check if the type of res.result is
-                    // same as the type we are trying to cast to
-                    if (DataType.findType(res.result) == castToType) {
-                        // remember this for future calls
-                        castNotNeeded = true;
-                        // just return the output
-                        return res;
-                    } else {
-                        // the input is a differen type
-                        // rethrow the exception
-                        int errCode = 1081;
-                        String msg = "Cannot cast to chararray. Expected 
bytearray but received: " + DataType.findTypeName(res.result);
-                        throw new ExecException(msg, errCode, 
PigException.INPUT, e);
+                    // res.result is not of type ByteArray. But it can be one 
of the types from which cast is still possible.
+                    if (realType == null)
+                        // Find the type in first call and cache it.
+                        realType = DataType.findType(res.result);
+                    try {
+                        res.result = DataType.toString(res.result, realType);
+                    } catch (ClassCastException cce) {
+                        // Type has changed. Need to find type again and try 
casting it again.
+                        realType = DataType.findType(res.result);
+                        res.result = DataType.toString(res.result, realType);
                     }
-
+                    return res;
                 }
                 try {
                     if (null != load) {

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataType.java?rev=781810&r1=781809&r2=781810&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataType.java Thu Jun  4 17:48:16 
2009
@@ -381,9 +381,9 @@
      * @return The object as a Integer.
      * @throws ExecException if the type can't be forced to an Integer.
      */
-    public static Integer toInteger(Object o) throws ExecException {
+    public static Integer toInteger(Object o,byte type) throws ExecException {
         try {
-                       switch (findType(o)) {
+                       switch (type) {
                        case BOOLEAN:
                            if (((Boolean)o) == true) return new Integer(1);
                            else return new Integer(0);
@@ -422,7 +422,9 @@
                            " to an Integer";
                            throw new ExecException(msg, errCode, 
PigException.INPUT);
                        }
-               } catch (ExecException ee) {
+               } catch (ClassCastException cce) {
+            throw cce;
+        } catch (ExecException ee) {
                        throw ee;
                } catch (NumberFormatException nfe) {
                        int errCode = 1074;
@@ -434,7 +436,17 @@
                        throw new ExecException(msg, errCode, PigException.BUG);
                }
     }
-
+    /**
+     * If type of object is not known, use this method, which internally calls
+     * toInteger(object,type)
+     * 
+     * @param o
+     * @return Object as Integer.
+     * @throws ExecException
+     */
+    public static Integer toInteger(Object o) throws ExecException {
+        return toInteger(o, findType(o));
+    }
     /**
      * Force a data object to a Long, if possible.  Any numeric type
      * can be forced to a Long (though precision may be lost), as well
@@ -445,9 +457,9 @@
      * @return The object as a Long.
      * @throws ExecException if the type can't be forced to a Long.
      */
-    public static Long toLong(Object o) throws ExecException {
+    public static Long toLong(Object o,byte type) throws ExecException {
         try {
-                       switch (findType(o)) {
+                       switch (type) {
                        case BOOLEAN:
                            if (((Boolean)o) == true) return new Long(1);
                            else return new Long(0);
@@ -486,7 +498,9 @@
                            " to a Long";
                            throw new ExecException(msg, errCode, 
PigException.INPUT);
                        }
-               } catch (ExecException ee) {
+               } catch (ClassCastException cce) {
+            throw cce;
+        } catch (ExecException ee) {
                        throw ee;
                } catch (NumberFormatException nfe) {
                        int errCode = 1074;
@@ -499,6 +513,17 @@
                }
 
     }
+    /**
+     * If type of object is not known, use this method which in turns call
+     * toLong(object,type) after finding type.
+     * 
+     * @param o
+     * @return Object as Long.
+     * @throws ExecException
+     */
+    public static Long toLong(Object o) throws ExecException {
+        return toLong(o, findType(o));
+    }
 
     /**
      * Force a data object to a Float, if possible.  Any numeric type
@@ -510,9 +535,9 @@
      * @return The object as a Float.
      * @throws ExecException if the type can't be forced to a Float.
      */
-    public static Float toFloat(Object o) throws ExecException {
+    public static Float toFloat(Object o,byte type) throws ExecException {
         try {
-                       switch (findType(o)) {
+                       switch (type) {
                        case INTEGER:
                            return new Float(((Integer)o).floatValue());
 
@@ -546,7 +571,9 @@
                            " to a Float";
                            throw new ExecException(msg, errCode, 
PigException.INPUT);
                        }
-               } catch (ExecException ee) {
+               } catch (ClassCastException cce) {
+            throw cce;
+        } catch (ExecException ee) {
                        throw ee;
                } catch (NumberFormatException nfe) {
                        int errCode = 1074;
@@ -558,6 +585,17 @@
                        throw new ExecException(msg, errCode, PigException.BUG);
                }
     }
+    /**
+     * If type of object is not known, use this method which in turns call
+     * toFloat(object,type) after finding type.
+     * 
+     * @param o
+     * @return Object as Float.
+     * @throws ExecException
+     */
+    public static Float toFloat(Object o) throws ExecException {
+        return toFloat(o, findType(o));
+    }
 
     /**
      * Force a data object to a Double, if possible.  Any numeric type
@@ -569,9 +607,9 @@
      * @return The object as a Double.
      * @throws ExecException if the type can't be forced to a Double.
      */
-    public static Double toDouble(Object o) throws ExecException {
+    public static Double toDouble(Object o,byte type) throws ExecException {
         try {
-                       switch (findType(o)) {
+                       switch (type) {
                        case INTEGER:
                            return new Double(((Integer)o).doubleValue());
 
@@ -605,7 +643,9 @@
                            " to a Double";
                            throw new ExecException(msg, errCode, 
PigException.INPUT);
                        }
-               } catch (ExecException ee) {
+               } catch (ClassCastException cce) {
+            throw cce;
+        } catch (ExecException ee) {
                        throw ee;
                } catch (NumberFormatException nfe) {
                        int errCode = 1074;
@@ -617,6 +657,17 @@
                        throw new ExecException(msg, errCode, PigException.BUG);
                }
     }
+    /**
+     * If type of object is not known, use this method which in turns call
+     * toLong(object,type) after finding type.
+     * 
+     * @param o
+     * @return Object as Double.
+     * @throws ExecException
+     */
+    public static Double toDouble(Object o) throws ExecException {
+        return toDouble(o, findType(o));
+    }
 
     /**
      * Force a data object to a String, if possible.  Any simple (atomic) type
@@ -627,9 +678,9 @@
      * @return The object as a String.
      * @throws ExecException if the type can't be forced to a String.
      */
-    public static String toString(Object o) throws ExecException {
+    public static String toString(Object o,byte type) throws ExecException {
         try {
-                       switch (findType(o)) {
+                       switch (type) {
                        case INTEGER:
                            return ((Integer)o).toString();
 
@@ -667,7 +718,9 @@
                            " to a String";
                            throw new ExecException(msg, errCode, 
PigException.INPUT);
                        }
-               } catch (ExecException ee) {
+               } catch (ClassCastException cce) {
+            throw cce;
+        } catch (ExecException ee) {
                        throw ee;
                } catch (Exception e) {
                        int errCode = 2054;
@@ -675,7 +728,17 @@
                        throw new ExecException(msg, errCode, PigException.BUG);
                }
     }
-
+    /**
+     * If type of object is not known, use this method which in turns call
+     * toString(object,type) after finding type.
+     * 
+     * @param o
+     * @return Object as String.
+     * @throws ExecException
+     */
+    public static String toString(Object o) throws ExecException {
+        return toString(o, findType(o));
+    }
     /**
      * If this object is a map, return it as a map.
      * This isn't particularly efficient, so if you

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPOCast.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPOCast.java?rev=781810&r1=781809&r2=781810&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPOCast.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPOCast.java Thu Jun  4 
17:48:16 2009
@@ -56,6 +56,9 @@
 
        Random r = new Random();
        final int MAX = 10;
+       Tuple dummyTuple = null;
+       Map<Object,Object> dummyMap = null;
+       DataBag dummyBag = null;
        
        @Test
        public void testIntegerToOther() throws PlanException, ExecException {
@@ -77,6 +80,10 @@
                plan.connect(prj, op);
                
                prj.setResultType(DataType.INTEGER);
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
@@ -86,6 +93,11 @@
                        if(res.returnStatus == POStatus.STATUS_OK) {
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(i, res.result);
+                       }
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -96,6 +108,11 @@
                        if(res.returnStatus == POStatus.STATUS_OK) {
                                assertEquals(f, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(f);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(f, res.result);
+                       }
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -106,6 +123,11 @@
                        if(res.returnStatus == POStatus.STATUS_OK) {
                                assertEquals(l, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(l);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(l, res.result);
+                       }
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -116,6 +138,11 @@
                        if(res.returnStatus == POStatus.STATUS_OK) {
                                assertEquals(d, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(d);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(d, res.result);
+                       }
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -126,6 +153,11 @@
                        if(res.returnStatus == POStatus.STATUS_OK) {
                                assertEquals(str, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(str);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(str, res.result);
+                       }
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -136,6 +168,11 @@
                        if(res.returnStatus == POStatus.STATUS_OK) {
                                assertEquals(dba, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(dba);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(dba, res.result);
+                       }
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -160,6 +197,30 @@
                        Result res = op.getNext(b);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyMap);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyTuple);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyBag);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
        }
        
        @Test
@@ -183,6 +244,11 @@
                
                prj.setResultType(DataType.LONG);
                
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
+               
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
@@ -192,6 +258,12 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
+                       
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -200,9 +272,14 @@
                        Float f = ((Long)t.get(0)).floatValue();
                        Result res = op.getNext(f);
                        if(res.returnStatus == POStatus.STATUS_OK) {
-//                             System.out.println(res.result + " : " + f);
+//                        System.out.println(res.result + " : " + f);
                                assertEquals(f, res.result);
                        }
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(f);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(f, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -214,6 +291,11 @@
                                //System.out.println(res.result + " : " + l);
                                assertEquals(l, res.result);
                        }
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(l);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(l, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -225,6 +307,11 @@
                                //System.out.println(res.result + " : " + d);
                                assertEquals(d, res.result);
                        }
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(d);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(d, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -236,6 +323,11 @@
                                //System.out.println(res.result + " : " + str);
                                assertEquals(str, res.result);
                        }
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(str);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(str, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -247,6 +339,11 @@
                                //System.out.println(res.result + " : " + dba);
                                assertEquals(dba, res.result);
                        }
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(dba);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(dba, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -271,6 +368,30 @@
                        Result res = op.getNext(b);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyMap);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyTuple);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyBag);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
        }
        
        @Test
@@ -294,6 +415,11 @@
                
                prj.setResultType(DataType.FLOAT);
                
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
+               
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
@@ -303,6 +429,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -311,9 +441,13 @@
                        Float f = ((Float)t.get(0)).floatValue();
                        Result res = op.getNext(f);
                        if(res.returnStatus == POStatus.STATUS_OK) {
-//                             System.out.println(res.result + " : " + f);
+//                       System.out.println(res.result + " : " + f);
                                assertEquals(f, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(f);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(f, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -325,6 +459,10 @@
                                //System.out.println(res.result + " : " + l);
                                assertEquals(l, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(l);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(l, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -336,6 +474,10 @@
                                //System.out.println(res.result + " : " + d);
                                assertEquals(d, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(d);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(d, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -347,6 +489,10 @@
                                //System.out.println(res.result + " : " + str);
                                assertEquals(str, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(str);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(str, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -358,17 +504,21 @@
                                //System.out.println(res.result + " : " + dba);
                                assertEquals(dba, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(dba);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(dba, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
-            if(t.get(0) == null) {
-               
-                       Float result  = (Float)op.getNext((Float)null).result;
+                       if(t.get(0) == null) {
+                       
+                                         Float result = (Float) 
op.getNext((Float) null).result;
                                assertEquals( null, result);
 
-            } 
+                       }
                }
 
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -393,6 +543,30 @@
                        Result res = op.getNext(b);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyMap);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyTuple);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyBag);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
        }
        
        @Test
@@ -416,6 +590,11 @@
                
                prj.setResultType(DataType.DOUBLE);
                
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
+               
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
@@ -425,6 +604,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -433,9 +616,13 @@
                        Float f = ((Double)t.get(0)).floatValue();
                        Result res = op.getNext(f);
                        if(res.returnStatus == POStatus.STATUS_OK) {
-//                             System.out.println(res.result + " : " + f);
+//                       System.out.println(res.result + " : " + f);
                                assertEquals(f, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(f);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(f, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -447,6 +634,10 @@
                                //System.out.println(res.result + " : " + l);
                                assertEquals(l, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(l);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(l, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -458,6 +649,10 @@
                                //System.out.println(res.result + " : " + d);
                                assertEquals(d, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(d);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(d, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -469,8 +664,12 @@
                                //System.out.println(res.result + " : " + str);
                                assertEquals(str, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(str);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(str, res.result);
                }
-               
+
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
@@ -480,6 +679,10 @@
                                //System.out.println(res.result + " : " + dba);
                                assertEquals(dba, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(dba);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(dba, res.result);
                }
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
@@ -504,6 +707,30 @@
                        Result res = op.getNext(b);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
+               {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyMap);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyTuple);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyBag);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
        }
        
        @Test
@@ -518,6 +745,11 @@
                plan.connect(prj, op);
                
                prj.setResultType(DataType.CHARARRAY);
+
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
                
                TupleFactory tf = TupleFactory.getInstance();
                
@@ -531,6 +763,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                {
@@ -543,6 +779,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                {
@@ -555,6 +795,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                {
@@ -567,6 +811,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                {
@@ -579,6 +827,10 @@
                                //System.out.println(res.result + " : " + str);
                                assertEquals(str, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(str);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(str, res.result);
                }
                
                {
@@ -592,6 +844,11 @@
                                //System.out.println(res.result + " : " + dba);
                                assertEquals(dba, res.result);
                        }
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(dba);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(dba, res.result);
                }
                
                {
@@ -610,7 +867,6 @@
                        Result res = op.getNext(t);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
-               
                {
                        Tuple t = tf.newTuple();
                        t.append(GenRandomData.genRandString(r));
@@ -619,6 +875,30 @@
                        Result res = op.getNext(b);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
+               {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyMap);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyTuple);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
+        {
+            planToTestBACasts.attachInput(dummyTuple);
+            try{
+                opWithInputTypeAsBA.getNext(dummyBag);
+            }catch (Exception e) {
+                assertEquals(ExecException.class, e.getClass());
+            }
+        }
        }
        
        public static class TestLoader implements LoadFunc{
@@ -639,15 +919,15 @@
         }
 
         public Boolean bytesToBoolean(byte[] b) throws IOException {
-               DataByteArray dba = new DataByteArray(b);
-               String str = dba.toString();
-               if(str.length() == 0)
-                       return new Boolean(false);
-               else return new Boolean(true);
+            DataByteArray dba = new DataByteArray(b);
+            String str = dba.toString();
+            if(str.length() == 0)
+                return new Boolean(false);
+            else return new Boolean(true);
         }
         
         public String bytesToCharArray(byte[] b) throws IOException {
-               DataByteArray dba = new DataByteArray(b);
+            DataByteArray dba = new DataByteArray(b);
             return dba.toString();
         }
         
@@ -668,45 +948,44 @@
         }
 
         public Map<Object, Object> bytesToMap(byte[] b) throws IOException {
-            return null;
+          return null;
         }
 
         public Tuple bytesToTuple(byte[] b) throws IOException {
             return null;
-        }        
+        }
 
-           public byte[] toBytes(DataBag bag) throws IOException {
-               return null;
-           }
-       
-           public byte[] toBytes(String s) throws IOException {
-               return s.getBytes();
-           }
-       
-           public byte[] toBytes(Double d) throws IOException {
-               return d.toString().getBytes();
-           }
-       
-           public byte[] toBytes(Float f) throws IOException {
-               return f.toString().getBytes();
-           }
-       
-           public byte[] toBytes(Integer i) throws IOException {
-               return i.toString().getBytes();
-           }
-       
-           public byte[] toBytes(Long l) throws IOException {
-               return l.toString().getBytes();
-           }
-       
-           public byte[] toBytes(Map<Object, Object> m) throws IOException {
-               return null;
-           }
+        public byte[] toBytes(DataBag bag) throws IOException {
+            return null;
+        }
        
-           public byte[] toBytes(Tuple t) throws IOException {
-               return null;
-           }
-
+        public byte[] toBytes(String s) throws IOException {
+            return s.getBytes();
+        }
+        
+        public byte[] toBytes(Double d) throws IOException {
+            return d.toString().getBytes();
+        }
+        
+        public byte[] toBytes(Float f) throws IOException {
+            return f.toString().getBytes();
+        }
+        
+        public byte[] toBytes(Integer i) throws IOException {
+            return i.toString().getBytes();
+        }
+        
+        public byte[] toBytes(Long l) throws IOException {
+            return l.toString().getBytes();
+        }
+        
+        public byte[] toBytes(Map<Object, Object> m) throws IOException {
+            return null;
+        }
+        
+        public byte[] toBytes(Tuple t) throws IOException {
+            return null;
+        }
         /* (non-Javadoc)
          * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, 
org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
          */
@@ -715,7 +994,7 @@
             // TODO Auto-generated method stub
             return null;
         }
-    }
+       }
        
        @Test
        public void testByteArrayToOther() throws PlanException, ExecException {
@@ -732,6 +1011,10 @@
                
                TupleFactory tf = TupleFactory.getInstance();
                
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
                {
                        Tuple t = tf.newTuple();
                        t.append(new DataByteArray((new 
Integer(r.nextInt())).toString().getBytes()));
@@ -742,9 +1025,14 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
-                                               
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
+
                }
-               
+
                {
                        Tuple t = tf.newTuple();
                        t.append(new DataByteArray((new 
Float(r.nextFloat())).toString().getBytes()));
@@ -755,6 +1043,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                {
@@ -767,6 +1059,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                {
@@ -779,6 +1075,10 @@
                                //System.out.println(res.result + " : " + i);
                                assertEquals(i, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(i, res.result);
                }
                
                {
@@ -791,16 +1091,25 @@
                                //System.out.println(res.result + " : " + str);
                                assertEquals(str, res.result);
                        }
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(str);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(str, res.result);
                }
                
                {
                        Tuple t = tf.newTuple();
                        t.append(new 
DataByteArray(GenRandomData.genRandString(r).getBytes()));
-               
+                       
                        plan.attachInput(t);
                        DataByteArray dba = (DataByteArray) t.get(0);
                        Result res = op.getNext(dba);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(dba);
+                       if(res.returnStatus == POStatus.STATUS_OK)
+                               assertEquals(POStatus.STATUS_ERR, 
res.returnStatus);
                }
                
                {
@@ -812,6 +1121,11 @@
                        //assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                        assertEquals(POStatus.STATUS_OK, res.returnStatus);
                        assertEquals(null, res.result);
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(map);
+                       assertEquals(POStatus.STATUS_OK, res.returnStatus);
+                       assertEquals(null, res.result);
                }
                
                {
@@ -822,6 +1136,11 @@
                        //assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                        assertEquals(POStatus.STATUS_OK, res.returnStatus);
                        assertEquals(null, res.result);
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(t);
+                       assertEquals(POStatus.STATUS_OK, res.returnStatus);
+                       assertEquals(null, res.result);
                }
                
                {
@@ -833,11 +1152,16 @@
                        //assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                        assertEquals(POStatus.STATUS_OK, res.returnStatus);
                        assertEquals(null, res.result);
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(b);
+                       assertEquals(POStatus.STATUS_OK, res.returnStatus);
+                       assertEquals(null, res.result);
                }
        }
        
        private PhysicalPlan constructPlan(POCast op) throws PlanException {
-           LoadFunc load = new TestLoader();
+        LoadFunc load = new TestLoader();
         op.setLoadFSpec(new FuncSpec(load.getClass().getName()));
         POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 
0);
         PhysicalPlan plan = new PhysicalPlan();
@@ -849,7 +1173,7 @@
        }
        
        /* 
-        * Test that if the input type is actually same 
+     * Test that if the input type is actually same 
      * as output type and we think that the input type is a
      * bytearray we still can handle it. This can happen in the
      * following situation:
@@ -862,7 +1186,7 @@
      * the input to the cast is already a string
      */
        @Test
-    public void testByteArrayToOtherNoCast() throws PlanException, 
ExecException {
+       public void testByteArrayToOtherNoCast() throws PlanException, 
ExecException {
         POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
         PhysicalPlan plan = constructPlan(op);
         TupleFactory tf = TupleFactory.getInstance();
@@ -983,7 +1307,7 @@
             }
         }
         
-    }
+       }
        
        @Test
        public void testTupleToOther() throws PlanException, ExecException {
@@ -999,6 +1323,11 @@
                
                TupleFactory tf = TupleFactory.getInstance();
                
+               //Plan to test when result type is ByteArray and casting is 
requested
+               //for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
+               
                {
                        Tuple t = tf.newTuple();
                        t.append(GenRandomData.genRandString(r));
@@ -1019,6 +1348,10 @@
                        Result res = op.getNext(t);
                        //System.out.println(res.result + " : " + t);
                        assertEquals(t, res.result);
+                       
+                       planToTestBACasts.attachInput(tNew);
+                       res = opWithInputTypeAsBA.getNext(t);
+                       assertEquals(t, res.result);
                }
                
                {
@@ -1075,7 +1408,7 @@
                        Result res = op.getNext(i);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
-               
+             
                {
                        Tuple t = tf.newTuple();
                        t.append(GenRandomData.genRandString(r));
@@ -1096,8 +1429,8 @@
                        DataByteArray i = null;
                        Result res = op.getNext(i);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
-
-                   op.setLoadFSpec(new FuncSpec(BinStorage.class.getName()));
+                       
+                       op.setLoadFSpec(new 
FuncSpec(BinStorage.class.getName()));
                        plan.attachInput(tNew);
                        res = op.getNext(i);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
@@ -1117,7 +1450,11 @@
                prj.setResultType(DataType.BAG);
                
                TupleFactory tf = TupleFactory.getInstance();
-               BagFactory bf = BagFactory.getInstance();
+               
+               //Plan to test when result type is ByteArray and casting is 
requested
+               //for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
                {
                        Tuple t = tf.newTuple();
                        t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 
100));
@@ -1143,6 +1480,10 @@
                        Result res = op.getNext(b);
                        //System.out.println(res.result + " : " + t);
                        assertEquals(b, res.result);
+                       
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(b);
+                       assertEquals(b, res.result);
                }
                
                {
@@ -1165,7 +1506,7 @@
                
                {
                        Tuple t = tf.newTuple();
-                       t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 
100));
+                       t.append(GenRandomData.genRandSmallTupDataBag(r, 1, 
100));       
                        plan.attachInput(t);
                        Float i = null;
                        Result res = op.getNext(i);
@@ -1197,8 +1538,8 @@
                        DataByteArray i = null;
                        Result res = op.getNext(i);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
-
-                   op.setLoadFSpec(new FuncSpec(BinStorage.class.getName()));
+                       
+                       op.setLoadFSpec(new 
FuncSpec(BinStorage.class.getName()));
                        plan.attachInput(t);
                        res = op.getNext(i);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
@@ -1214,11 +1555,13 @@
                plan.add(prj);
                plan.add(op);
                plan.connect(prj, op);
-               
                prj.setResultType(DataType.MAP);
                
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
                TupleFactory tf = TupleFactory.getInstance();
-               BagFactory bf = BagFactory.getInstance();
                {
                        Tuple t = tf.newTuple();
                        t.append(GenRandomData.genRandMap(r, 10));
@@ -1227,6 +1570,10 @@
                        Result res = op.getNext(map);
                        //System.out.println(res.result + " : " + t);
                        assertEquals(map, res.result);
+                    
+                       planToTestBACasts.attachInput(t);
+                       res = opWithInputTypeAsBA.getNext(map);
+                       assertEquals(map, res.result);
                }
                
                {
@@ -1235,6 +1582,7 @@
                        plan.attachInput(t);
                        Result res = op.getNext(t);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+                       
                }
                
                {
@@ -1244,6 +1592,7 @@
                        DataBag b = null;
                        Result res = op.getNext(b);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
+                       
                }
                
                {
@@ -1254,7 +1603,7 @@
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
                }
                
-               {
+               {     
                        Tuple t = tf.newTuple();
                        t.append(GenRandomData.genRandMap(r, 10));
                        Long i = null;
@@ -1294,7 +1643,7 @@
                        Result res = op.getNext(i);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
 
-                   op.setLoadFSpec(new FuncSpec(BinStorage.class.getName()));
+                       op.setLoadFSpec(new 
FuncSpec(BinStorage.class.getName()));
                        plan.attachInput(t);
                        res = op.getNext(i);
                        assertEquals(POStatus.STATUS_ERR, res.returnStatus);
@@ -1309,33 +1658,33 @@
                        Tuple t = TupleFactory.getInstance().newTuple();
                        t.append(r.nextInt());
                        bag.add(t);
-            if( r.nextInt(3) % 3 == 0 ){
-               t = TupleFactory.getInstance().newTuple();
-               t.append(null);
-                   bag.add(t);
-            }
-
+                       if(r.nextInt(3) % 3 == 0) {
+                               t = TupleFactory.getInstance().newTuple();
+                               t.append(null);
+                               bag.add(t);
+                       }
+                       
                }
                
                POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
                POProject prj = new POProject(new OperatorKey("", 
r.nextLong()), -1, 0);
-               PhysicalPlan plan = new PhysicalPlan();
+               PhysicalPlan plan = new PhysicalPlan();       
                plan.add(prj);
                plan.add(op);
                plan.connect(prj, op);
                
-               prj.setResultType(DataType.INTEGER);
+               prj.setResultType(DataType.INTEGER);      
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
-            if(t.get(0) == null) {
-               
-                       Integer result  = 
(Integer)op.getNext((Integer)null).result;
+                       if(t.get(0) == null) {
+                               
+                               Integer result  = 
(Integer)op.getNext((Integer)null).result;
                                assertEquals( null, result);
 
-            } 
-            
+                       } 
+                       
                }
                
                prj.setResultType(DataType.FLOAT);
@@ -1343,25 +1692,26 @@
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
-            if(t.get(0) == null) {
-               
-                       Float result  = (Float)op.getNext((Float)null).result;
+                       if(t.get(0) == null) {
+                               
+                               Integer result  = 
(Integer)op.getNext((Integer)null).result;
                                assertEquals( null, result);
 
-            } 
+                       } 
+                       
                }
-
+               
                prj.setResultType(DataType.DOUBLE);
                
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
-            if(t.get(0) == null) {
-               
-                       Double result  = 
(Double)op.getNext((Double)null).result;
-                               assertEquals( null, result);
+                       if(t.get(0) == null) {
+                               
+                               Double result = (Double) op.getNext((Double) 
null).result;
+                       assertEquals(null, result);
 
-            } 
+                       }
                }
                
                prj.setResultType(DataType.CHARARRAY);
@@ -1369,12 +1719,13 @@
                for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
                        Tuple t = it.next();
                        plan.attachInput(t);
-            if(t.get(0) == null) {
-               
-                       String result  = 
(String)op.getNext((String)null).result;
+                       if(t.get(0) == null) {
+                               
+                               String result  = 
(String)op.getNext((String)null).result;
                                assertEquals( null, result);
 
-            } 
+                       } 
+                       
                }
                
                prj.setResultType(DataType.BYTEARRAY);
@@ -1385,16 +1736,72 @@
                        Tuple t = tf.newTuple();
                        t.append(new DataByteArray((new 
Integer(r.nextInt())).toString().getBytes()));
                        plan.attachInput(t);
-            if(t.get(0) == null) {
-               
-               DataByteArray result  = 
(DataByteArray)op.getNext((String)null).result;
-                               assertEquals( null, result);
+                       if(t.get(0) == null) {
+                               
+                               DataByteArray result = (DataByteArray) 
op.getNext((String) null).result;
+                               assertEquals(null, result);
+                               
+                       }
+                       
+               }
+               
+       }
+       
+       @Test
+       public void testValueTypesChanged() throws PlanException, ExecException 
{
 
-            } 
+               // Plan to test when result type is ByteArray and casting is 
requested
+               // for example casting of values coming out of map lookup.
+               POCast opWithInputTypeAsBA = new POCast(new OperatorKey("", 
r.nextLong()), -1);
+               PhysicalPlan planToTestBACasts = 
constructPlan(opWithInputTypeAsBA);
+               DataBag bag = BagFactory.getInstance().newDefaultBag();
 
+               // Create a bag having tuples having values of different types.
+               for(int i = 0; i < MAX; i++) {
+                       Tuple t = TupleFactory.getInstance().newTuple();
+                       if(i % 4 == 0)
+                               t.append(r.nextInt());
+                       if(i % 4 == 1)
+                               t.append(r.nextLong());
+                       if(i % 4 == 2)
+                               t.append(r.nextDouble());
+                       if(i % 4 == 3)
+                               t.append(r.nextFloat());
+                       bag.add(t);
                }
 
+               for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+                       Tuple t = it.next();
+                       planToTestBACasts.attachInput(t);
+                       Object toCast = t.get(0);
+                       Integer i = DataType.toInteger(toCast);
+                       Result res = opWithInputTypeAsBA.getNext(i);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(i, res.result);
+                       }
+                       Long l = DataType.toLong(toCast);
+                       res = opWithInputTypeAsBA.getNext(l);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(l, res.result);
+                       }
 
-       
+                       Float f = DataType.toFloat(toCast);
+                       res = opWithInputTypeAsBA.getNext(f);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(f, res.result);
+                       }
+
+                       Double d = DataType.toDouble(toCast);
+                       res = opWithInputTypeAsBA.getNext(d);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(d, res.result);
+                       }
+
+                       String s = DataType.toString(toCast);
+                       res = opWithInputTypeAsBA.getNext(s);
+                       if(res.returnStatus == POStatus.STATUS_OK) {
+                               assertEquals(s, res.result);
+                       }
+               }
        }
 }


Reply via email to