Author: pradeepkth
Date: Fri May 22 20:29:46 2009
New Revision: 777696

URL: http://svn.apache.org/viewvc?rev=777696&view=rev
Log:
PIG-814:Make Binstorage more robust when data contains record markers 
(pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=777696&r1=777695&r2=777696&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri May 22 20:29:46 2009
@@ -48,6 +48,8 @@
 
 BUG FIXES
 
+PIG-814: Make Binstorage more robust when data contains record markers 
(pradeepkth)
+
 PIG-811: Globs with "?" in the pattern are broken in local mode (hagleitn via
 olgan)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=777696&r1=777695&r2=777696&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Fri May 22 20:29:46 2009
@@ -228,7 +228,7 @@
                                    spec = new 
ValidatingInputFileSpec(inputs.get(i).first, store);
                                }
                                boolean isSplittable = inputs.get(i).second;
-                               if (isSplittable && (spec.getSlicer() 
instanceof PigSlicer)) {
+                               if ((spec.getSlicer() instanceof PigSlicer)) {
                                    
((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
                                }
                                Slice[] pigs = spec.getSlicer().slice(store, 
spec.getFileName());

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=777696&r1=777695&r2=777696&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Fri May 22 
20:29:46 2009
@@ -98,10 +98,17 @@
                 continue;
             }
             if(b == -1) return null;
+            b = (byte) in.read();
+            if(b != DataType.TUPLE && b != -1) {
+                continue;
+            }
+            if(b == -1) return null;
             break;
         }
         try {
-            return (Tuple)DataReaderWriter.readDatum(inData);
+            // if we got here, we have seen 
RECORD_1-RECORD_2-RECORD_3-TUPLE_MARKER
+            // sequence - lets now read the contents of the tuple 
+            return (Tuple)DataReaderWriter.readDatum(inData, DataType.TUPLE);
         } catch (ExecException ee) {
             throw ee;
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java?rev=777696&r1=777695&r2=777696&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataReaderWriter.java Fri May 22 
20:29:46 2009
@@ -88,11 +88,14 @@
         return new String(ba, DataReaderWriter.UTF8);
     }
     
-        
     public static Object readDatum(DataInput in) throws IOException, 
ExecException {
         // Read the data type
         byte b = in.readByte();
-        switch (b) {
+        return readDatum(in, b);
+    }
+        
+    public static Object readDatum(DataInput in, byte type) throws 
IOException, ExecException {
+        switch (type) {
             case DataType.TUPLE: 
                 return bytesToTuple(in);
             
@@ -138,7 +141,7 @@
                 return null;
 
             default:
-                throw new RuntimeException("Unexpected data type " + b +
+                throw new RuntimeException("Unexpected data type " + type +
                     " found in stream.");
         }
     }


Reply via email to